diff --git a/src/box/wal.cc b/src/box/wal.cc index 82a1d32aed5e2d4dff84485d5334cdd28df61b8e..4d0070c64a3846933a0c3ce8f880a9f8f39fc69a 100644 --- a/src/box/wal.cc +++ b/src/box/wal.cc @@ -66,8 +66,8 @@ wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */) struct cpipe *pipe = (struct cpipe *) watcher->data; cbus_lock(pipe->bus); - bool input_was_empty = STAILQ_EMPTY(&pipe->pipe); - STAILQ_CONCAT(&pipe->pipe, &pipe->input); + bool input_was_empty = stailq_empty(&pipe->pipe); + stailq_concat(&pipe->pipe, &pipe->input); cbus_unlock(pipe->bus); if (input_was_empty) @@ -90,34 +90,36 @@ wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */) * call in the writer thread loop). */ static void -tx_schedule_queue(struct cmsg_fifo *queue) +tx_schedule_queue(struct stailq *queue) { /* - * Can't use STAILQ_FOREACH since fiber_call() + * Can't use stailq_foreach since fiber_call() * destroys the list entry. */ - struct cmsg *m, *tmp; - STAILQ_FOREACH_SAFE(m, queue, fifo, tmp) - fiber_call(((struct wal_request *) m)->fiber); + struct wal_request *req, *tmp; + stailq_foreach_entry_safe(req, tmp, queue, fifo) + fiber_call(req->fiber); } static void tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */) { struct wal_writer *writer = (struct wal_writer *) watcher->data; - struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit); - struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); + struct stailq commit; + struct stailq rollback; + stailq_create(&commit); + stailq_create(&rollback); bool is_rollback; cbus_lock(&writer->tx_wal_bus); - STAILQ_CONCAT(&commit, &writer->tx_pipe.pipe); + stailq_concat(&commit, &writer->tx_pipe.pipe); is_rollback = writer->is_rollback; if (is_rollback) - STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe); + stailq_concat(&rollback, &writer->wal_pipe.pipe); writer->is_rollback = false; cbus_unlock(&writer->tx_wal_bus); if (is_rollback) - STAILQ_CONCAT(&rollback, &writer->wal_pipe.input); + stailq_concat(&rollback, &writer->wal_pipe.input); tx_schedule_queue(&commit); /* @@ -127,7 +129,7 @@ tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */) * in reverse order, performing a playback of the * in-memory database state. */ - STAILQ_REVERSE(&rollback, cmsg, fifo); + stailq_reverse(&rollback); tx_schedule_queue(&rollback); } @@ -304,8 +306,8 @@ wal_writer_pop(struct wal_writer *writer) while (! writer->is_shutdown) { if (! writer->is_rollback && - ! STAILQ_EMPTY(&writer->wal_pipe.pipe)) { - STAILQ_CONCAT(&writer->wal_pipe.output, + ! stailq_empty(&writer->wal_pipe.pipe)) { + stailq_concat(&writer->wal_pipe.output, &writer->wal_pipe.pipe); break; } @@ -315,19 +317,19 @@ wal_writer_pop(struct wal_writer *writer) static void wal_write_to_disk(struct recovery *r, struct wal_writer *writer, - struct cmsg_fifo *input, struct cmsg_fifo *commit, - struct cmsg_fifo *rollback) + struct stailq *input, struct stailq *commit, + struct stailq *rollback) { /* * Input queue can only be empty on wal writer shutdown. * In this case wal_opt_rotate can create an extra empty xlog. */ - if (unlikely(STAILQ_EMPTY(input))) + if (unlikely(stailq_empty(input))) return; /* Xlog is only rotated between queue processing */ if (wal_opt_rotate(&r->current_wal, r, &writer->vclock) != 0) { - STAILQ_SPLICE(input, STAILQ_FIRST(input), fifo, rollback); + stailq_concat(rollback, input); return; } @@ -365,9 +367,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer, * Iterate over requests (transactions) */ struct wal_request *req; - for (req = (struct wal_request *) STAILQ_FIRST(input); - req != NULL; - req = (struct wal_request *) STAILQ_NEXT(req, fifo)) { + stailq_foreach_entry(req, input, fifo) { /* Save relative offset of request start */ req->start_offset = batched_bytes; req->end_offset = -1; @@ -418,9 +418,9 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer, * `commit` queue and all other to `rollback` queue. */ struct wal_request *reqend = req; - for (req = (struct wal_request *) STAILQ_FIRST(input); + for (req = stailq_first_entry(input, struct wal_request, fifo); req != reqend; - req = (struct wal_request *) STAILQ_NEXT(req, fifo)) { + req = stailq_next_entry(req, fifo)) { /* * Check if request has been fully written to xlog. */ @@ -448,7 +448,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer, written_bytes = req->start_offset; /* Move tail to `rollback` queue. */ - STAILQ_SPLICE(input, req, fifo, rollback); + stailq_splice(input, &req->fifo, rollback); break; } @@ -464,7 +464,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer, fiber_gc(); /* Move all processed requests to `commit` queue */ - STAILQ_CONCAT(commit, input); + stailq_concat(commit, input); return; } @@ -479,8 +479,10 @@ wal_writer_f(va_list ap) cpipe_create(&writer->wal_pipe); cbus_join(&writer->tx_wal_bus, &writer->wal_pipe); - struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit); - struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback); + struct stailq commit; + struct stailq rollback; + stailq_create(&commit); + stailq_create(&rollback); cbus_lock(&writer->tx_wal_bus); while (! writer->is_shutdown) { @@ -498,8 +500,8 @@ wal_writer_f(va_list ap) tt_pthread_mutex_unlock(&writer->watchers_mutex); cbus_lock(&writer->tx_wal_bus); - STAILQ_CONCAT(&writer->tx_pipe.pipe, &commit); - if (! STAILQ_EMPTY(&rollback)) { + stailq_concat(&writer->tx_pipe.pipe, &commit); + if (! stailq_empty(&rollback)) { /* * Begin rollback: create a rollback queue * from all requests which were not @@ -507,8 +509,8 @@ wal_writer_f(va_list ap) * input queue. */ writer->is_rollback = true; - STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe); - STAILQ_CONCAT(&writer->wal_pipe.pipe, &rollback); + stailq_concat(&rollback, &writer->wal_pipe.pipe); + stailq_concat(&writer->wal_pipe.pipe, &rollback); } ev_async_send(writer->tx_pipe.consumer, &writer->tx_pipe.fetch_output); diff --git a/src/box/wal.h b/src/box/wal.h index 3a08d5ab003d1a8cc5c9b93c4ce0924f3a3c169c..79a837ce18eceb713343ac4fea4db99435b0bea8 100644 --- a/src/box/wal.h +++ b/src/box/wal.h @@ -30,7 +30,6 @@ * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF * SUCH DAMAGE. */ -#include <third_party/queue.h> #include <stdint.h> #include "cbus.h" #include "small/rlist.h" diff --git a/src/cbus.c b/src/cbus.c index bdd98ab67a5df2c20de63b5054ba542401ea8e84..cfec74600b3131e523cd1d250234dbe8ebfa9c55 100644 --- a/src/cbus.c +++ b/src/cbus.c @@ -60,9 +60,9 @@ cpipe_fetch_output_cb(ev_loop *loop, struct ev_async *watcher, void cpipe_create(struct cpipe *pipe) { - STAILQ_INIT(&pipe->pipe); - STAILQ_INIT(&pipe->input); - STAILQ_INIT(&pipe->output); + stailq_create(&pipe->pipe); + stailq_create(&pipe->input); + stailq_create(&pipe->output); pipe->n_input = 0; pipe->max_input = INT_MAX; @@ -179,19 +179,19 @@ cbus_flush_cb(ev_loop *loop, struct ev_async *watcher, /* Trigger task processing when the queue becomes non-empty. */ bool pipe_was_empty; - bool peer_output_was_empty = STAILQ_EMPTY(&peer->output); + bool peer_output_was_empty = stailq_empty(&peer->output); cbus_lock(pipe->bus); pipe_was_empty = !ev_async_pending(&pipe->fetch_output); /** Flush input */ - STAILQ_CONCAT(&pipe->pipe, &pipe->input); + stailq_concat(&pipe->pipe, &pipe->input); /* * While at it, pop output. * The consumer of the output of the bound queue is the * same as the producer of input, so we can safely access it. * We can safely access queue because it's locked. */ - STAILQ_CONCAT(&peer->output, &peer->pipe); + stailq_concat(&peer->output, &peer->pipe); cbus_unlock(pipe->bus); @@ -202,14 +202,14 @@ cbus_flush_cb(ev_loop *loop, struct ev_async *watcher, ev_async_send(pipe->consumer, &pipe->fetch_output); } - if (peer_output_was_empty && !STAILQ_EMPTY(&peer->output)) + if (peer_output_was_empty && !stailq_empty(&peer->output)) ev_feed_event(peer->consumer, &peer->fetch_output, EV_CUSTOM); } struct cmsg * cpipe_peek_impl(struct cpipe *pipe) { - assert(STAILQ_EMPTY(&pipe->output)); + assert(stailq_empty(&pipe->output)); struct cpipe *peer = pipe->peer; assert(pipe->consumer == loop()); @@ -219,10 +219,10 @@ cpipe_peek_impl(struct cpipe *pipe) cbus_lock(pipe->bus); - STAILQ_CONCAT(&pipe->output, &pipe->pipe); - if (! STAILQ_EMPTY(&peer->input)) { + stailq_concat(&pipe->output, &pipe->pipe); + if (! stailq_empty(&peer->input)) { peer_pipe_was_empty = !ev_async_pending(&peer->fetch_output); - STAILQ_CONCAT(&peer->pipe, &peer->input); + stailq_concat(&peer->pipe, &peer->input); } cbus_unlock(pipe->bus); peer->n_input = 0; @@ -233,7 +233,7 @@ cpipe_peek_impl(struct cpipe *pipe) ev_async_send(peer->consumer, &peer->fetch_output); } - return STAILQ_FIRST(&pipe->output); + return stailq_first_entry(&pipe->output, struct cmsg, fifo); } @@ -301,7 +301,7 @@ cpipe_fiber_pool_cb(ev_loop *loop, struct ev_async *watcher, (struct cpipe_fiber_pool *) watcher->data; struct cpipe *pipe = pool->pipe; (void) cpipe_peek(pipe); - while (! STAILQ_EMPTY(&pipe->output)) { + while (! stailq_empty(&pipe->output)) { struct fiber *f; if (! rlist_empty(&pool->fiber_cache)) { f = rlist_shift_entry(&pool->fiber_cache, diff --git a/src/cbus.h b/src/cbus.h index fb27140b63d0ef960f583a47c10c067ebce267c1..cf4286a1a7789bd848c059733e894968195e1513 100644 --- a/src/cbus.h +++ b/src/cbus.h @@ -32,7 +32,7 @@ */ #include "fiber.h" #include "rmean.h" -#include "third_party/queue.h" +#include "salad/stailq.h" #if defined(__cplusplus) extern "C" { @@ -87,7 +87,7 @@ struct cmsg { * message is stuck in currently, waiting to get * delivered. */ - STAILQ_ENTRY(cmsg) fifo; + struct stailq_entry fifo; /** The message routing path. */ struct cmsg_hop *route; /** The current hop the message is at. */ @@ -105,8 +105,6 @@ cmsg_init(struct cmsg *msg, struct cmsg_hop *route) msg->hop = msg->route = route; } -STAILQ_HEAD(cmsg_fifo, cmsg); - #define CACHELINE_SIZE 64 /** A uni-directional FIFO queue from one cord to another. */ struct cpipe { @@ -121,7 +119,7 @@ struct cpipe { * output <-- owned by the producer thread */ struct { - struct cmsg_fifo pipe; + struct stailq pipe; /** Peer pipe - the other direction of the bus. */ struct cpipe *peer; struct cbus *bus; @@ -129,7 +127,7 @@ struct cpipe { /** Stuff most actively used in the producer thread. */ struct { /** Staging area for pushed messages */ - struct cmsg_fifo input; + struct stailq input; /** Counters are useful for finer-grained scheduling. */ int n_input; /** @@ -152,7 +150,7 @@ struct cpipe { /** Stuff related to the consumer. */ struct { /** Staged messages (for pop) */ - struct cmsg_fifo output; + struct stailq output; /** * Used to trigger task processing when * the pipe becomes non-empty. @@ -214,11 +212,9 @@ cpipe_pop_output(struct cpipe *pipe) { assert(loop() == pipe->consumer); - if (STAILQ_EMPTY(&pipe->output)) + if (stailq_empty(&pipe->output)) return NULL; - struct cmsg *msg = STAILQ_FIRST(&pipe->output); - STAILQ_REMOVE_HEAD(&pipe->output, fifo); - return msg; + return stailq_shift_entry(&pipe->output, struct cmsg, fifo); } struct cmsg * @@ -233,10 +229,10 @@ cpipe_peek(struct cpipe *pipe) { assert(loop() == pipe->consumer); - if (STAILQ_EMPTY(&pipe->output)) + if (stailq_empty(&pipe->output)) return cpipe_peek_impl(pipe); - return STAILQ_FIRST(&pipe->output); + return stailq_first_entry(&pipe->output, struct cmsg, fifo); } /** @@ -310,7 +306,7 @@ cpipe_push_input(struct cpipe *pipe, struct cmsg *msg) { assert(loop() == pipe->producer); - STAILQ_INSERT_TAIL(&pipe->input, msg, fifo); + stailq_add_tail_entry(&pipe->input, msg, fifo); pipe->n_input++; if (pipe->n_input >= pipe->max_input) ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM); diff --git a/test/unit/stailq.result b/test/unit/stailq.result index 09225c0adf588173134d9d488c0a85878b40d914..debebf52db49893f2c197a6da6ad61ab86103acd 100644 --- a/test/unit/stailq.result +++ b/test/unit/stailq.result @@ -1,4 +1,4 @@ -1..29 +1..37 ok 1 - list is empty ok 2 - list is empty after reverse ok 3 - first item @@ -13,18 +13,26 @@ ok 11 - element (foreach) 6 ok 12 - first item ok 13 - head is not empty ok 14 - first entry -ok 15 - next is empty -ok 16 - element (foreach_entry) 6 -ok 17 - element (foreach_entry) 5 -ok 18 - element (foreach_entry) 4 -ok 19 - element (foreach_entry) 3 -ok 20 - element (foreach_entry) 2 -ok 21 - element (foreach_entry) 1 -ok 22 - element (foreach_entry) 0 -ok 23 - element (foreach_entry) 0 -ok 24 - element (foreach_entry) 1 -ok 25 - element (foreach_entry) 2 -ok 26 - element (foreach_entry) 3 -ok 27 - element (foreach_entry) 4 -ok 28 - element (foreach_entry) 5 -ok 29 - element (foreach_entry) 6 +ok 15 - shift item 0 +ok 16 - shift item 1 +ok 17 - shift item 2 +ok 18 - shift item 3 +ok 19 - shift item 4 +ok 20 - shift item 5 +ok 21 - shift item 6 +ok 22 - list is empty after shift +ok 23 - next is empty +ok 24 - element (foreach_entry) 6 +ok 25 - element (foreach_entry) 5 +ok 26 - element (foreach_entry) 4 +ok 27 - element (foreach_entry) 3 +ok 28 - element (foreach_entry) 2 +ok 29 - element (foreach_entry) 1 +ok 30 - element (foreach_entry) 0 +ok 31 - element (foreach_entry) 0 +ok 32 - element (foreach_entry) 1 +ok 33 - element (foreach_entry) 2 +ok 34 - element (foreach_entry) 3 +ok 35 - element (foreach_entry) 4 +ok 36 - element (foreach_entry) 5 +ok 37 - element (foreach_entry) 6