From 92db0df7da1fa7c4724bb53ab869a4f2f30ea67a Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Thu, 12 Nov 2015 21:30:24 +0300
Subject: [PATCH] wal: use stailq instead of STAILQ

---
 src/box/wal.cc          | 66 +++++++++++++++++++++--------------------
 src/box/wal.h           |  1 -
 src/cbus.c              | 26 ++++++++--------
 src/cbus.h              | 24 +++++++--------
 test/unit/stailq.result | 40 +++++++++++++++----------
 5 files changed, 81 insertions(+), 76 deletions(-)

diff --git a/src/box/wal.cc b/src/box/wal.cc
index 82a1d32aed..4d0070c64a 100644
--- a/src/box/wal.cc
+++ b/src/box/wal.cc
@@ -66,8 +66,8 @@ wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */)
 	struct cpipe *pipe = (struct cpipe *) watcher->data;
 
 	cbus_lock(pipe->bus);
-	bool input_was_empty = STAILQ_EMPTY(&pipe->pipe);
-	STAILQ_CONCAT(&pipe->pipe, &pipe->input);
+	bool input_was_empty = stailq_empty(&pipe->pipe);
+	stailq_concat(&pipe->pipe, &pipe->input);
 	cbus_unlock(pipe->bus);
 
 	if (input_was_empty)
@@ -90,34 +90,36 @@ wal_flush_input(ev_loop * /* loop */, ev_async *watcher, int /* event */)
  * call in the writer thread loop).
  */
 static void
-tx_schedule_queue(struct cmsg_fifo *queue)
+tx_schedule_queue(struct stailq *queue)
 {
 	/*
-	 * Can't use STAILQ_FOREACH since fiber_call()
+	 * Can't use stailq_foreach since fiber_call()
 	 * destroys the list entry.
 	 */
-	struct cmsg *m, *tmp;
-	STAILQ_FOREACH_SAFE(m, queue, fifo, tmp)
-		fiber_call(((struct wal_request *) m)->fiber);
+	struct wal_request *req, *tmp;
+	stailq_foreach_entry_safe(req, tmp, queue, fifo)
+		fiber_call(req->fiber);
 }
 
 static void
 tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */)
 {
 	struct wal_writer *writer = (struct wal_writer *) watcher->data;
-	struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
-	struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
+	struct stailq commit;
+	struct stailq rollback;
+	stailq_create(&commit);
+	stailq_create(&rollback);
 
 	bool is_rollback;
 	cbus_lock(&writer->tx_wal_bus);
-	STAILQ_CONCAT(&commit, &writer->tx_pipe.pipe);
+	stailq_concat(&commit, &writer->tx_pipe.pipe);
 	is_rollback = writer->is_rollback;
 	if (is_rollback)
-		STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe);
+		stailq_concat(&rollback, &writer->wal_pipe.pipe);
 	writer->is_rollback = false;
 	cbus_unlock(&writer->tx_wal_bus);
 	if (is_rollback)
-		STAILQ_CONCAT(&rollback, &writer->wal_pipe.input);
+		stailq_concat(&rollback, &writer->wal_pipe.input);
 
 	tx_schedule_queue(&commit);
 	/*
@@ -127,7 +129,7 @@ tx_fetch_output(ev_loop * /* loop */, ev_async *watcher, int /* event */)
 	 * in reverse order, performing a playback of the
 	 * in-memory database state.
 	 */
-	STAILQ_REVERSE(&rollback, cmsg, fifo);
+	stailq_reverse(&rollback);
 	tx_schedule_queue(&rollback);
 }
 
@@ -304,8 +306,8 @@ wal_writer_pop(struct wal_writer *writer)
 	while (! writer->is_shutdown)
 	{
 		if (! writer->is_rollback &&
-		    ! STAILQ_EMPTY(&writer->wal_pipe.pipe)) {
-			STAILQ_CONCAT(&writer->wal_pipe.output,
+		    ! stailq_empty(&writer->wal_pipe.pipe)) {
+			stailq_concat(&writer->wal_pipe.output,
 				      &writer->wal_pipe.pipe);
 			break;
 		}
@@ -315,19 +317,19 @@ wal_writer_pop(struct wal_writer *writer)
 
 static void
 wal_write_to_disk(struct recovery *r, struct wal_writer *writer,
-		  struct cmsg_fifo *input, struct cmsg_fifo *commit,
-		  struct cmsg_fifo *rollback)
+		  struct stailq *input, struct stailq *commit,
+		  struct stailq *rollback)
 {
 	/*
 	 * Input queue can only be empty on wal writer shutdown.
 	 * In this case wal_opt_rotate can create an extra empty xlog.
 	 */
-	if (unlikely(STAILQ_EMPTY(input)))
+	if (unlikely(stailq_empty(input)))
 		return;
 
 	/* Xlog is only rotated between queue processing  */
 	if (wal_opt_rotate(&r->current_wal, r, &writer->vclock) != 0) {
-		STAILQ_SPLICE(input, STAILQ_FIRST(input), fifo, rollback);
+		stailq_concat(rollback, input);
 		return;
 	}
 
@@ -365,9 +367,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer,
 	 * Iterate over requests (transactions)
 	 */
 	struct wal_request *req;
-	for (req = (struct wal_request *) STAILQ_FIRST(input);
-	     req != NULL;
-	     req = (struct wal_request *) STAILQ_NEXT(req, fifo)) {
+	stailq_foreach_entry(req, input, fifo) {
 		/* Save relative offset of request start */
 		req->start_offset = batched_bytes;
 		req->end_offset = -1;
@@ -418,9 +418,9 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer,
 	 * `commit` queue and all other to `rollback` queue.
 	 */
 	struct wal_request *reqend = req;
-	for (req = (struct wal_request *) STAILQ_FIRST(input);
+	for (req = stailq_first_entry(input, struct wal_request, fifo);
 	     req != reqend;
-	     req = (struct wal_request *) STAILQ_NEXT(req, fifo)) {
+	     req = stailq_next_entry(req, fifo)) {
 		/*
 		 * Check if request has been fully written to xlog.
 		 */
@@ -448,7 +448,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer,
 			written_bytes = req->start_offset;
 
 			/* Move tail to `rollback` queue. */
-			STAILQ_SPLICE(input, req, fifo, rollback);
+			stailq_splice(input, &req->fifo, rollback);
 			break;
 		}
 
@@ -464,7 +464,7 @@ wal_write_to_disk(struct recovery *r, struct wal_writer *writer,
 
 	fiber_gc();
 	/* Move all processed requests to `commit` queue */
-	STAILQ_CONCAT(commit, input);
+	stailq_concat(commit, input);
 	return;
 }
 
@@ -479,8 +479,10 @@ wal_writer_f(va_list ap)
 	cpipe_create(&writer->wal_pipe);
 	cbus_join(&writer->tx_wal_bus, &writer->wal_pipe);
 
-	struct cmsg_fifo commit = STAILQ_HEAD_INITIALIZER(commit);
-	struct cmsg_fifo rollback = STAILQ_HEAD_INITIALIZER(rollback);
+	struct stailq commit;
+	struct stailq rollback;
+	stailq_create(&commit);
+	stailq_create(&rollback);
 
 	cbus_lock(&writer->tx_wal_bus);
 	while (! writer->is_shutdown) {
@@ -498,8 +500,8 @@ wal_writer_f(va_list ap)
 		tt_pthread_mutex_unlock(&writer->watchers_mutex);
 
 		cbus_lock(&writer->tx_wal_bus);
-		STAILQ_CONCAT(&writer->tx_pipe.pipe, &commit);
-		if (! STAILQ_EMPTY(&rollback)) {
+		stailq_concat(&writer->tx_pipe.pipe, &commit);
+		if (! stailq_empty(&rollback)) {
 			/*
 			 * Begin rollback: create a rollback queue
 			 * from all requests which were not
@@ -507,8 +509,8 @@ wal_writer_f(va_list ap)
 			 * input queue.
 			 */
 			writer->is_rollback = true;
-			STAILQ_CONCAT(&rollback, &writer->wal_pipe.pipe);
-			STAILQ_CONCAT(&writer->wal_pipe.pipe, &rollback);
+			stailq_concat(&rollback, &writer->wal_pipe.pipe);
+			stailq_concat(&writer->wal_pipe.pipe, &rollback);
 		}
 		ev_async_send(writer->tx_pipe.consumer,
 			      &writer->tx_pipe.fetch_output);
diff --git a/src/box/wal.h b/src/box/wal.h
index 3a08d5ab00..79a837ce18 100644
--- a/src/box/wal.h
+++ b/src/box/wal.h
@@ -30,7 +30,6 @@
  * THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
  * SUCH DAMAGE.
  */
-#include  <third_party/queue.h>
 #include <stdint.h>
 #include "cbus.h"
 #include "small/rlist.h"
diff --git a/src/cbus.c b/src/cbus.c
index bdd98ab67a..cfec74600b 100644
--- a/src/cbus.c
+++ b/src/cbus.c
@@ -60,9 +60,9 @@ cpipe_fetch_output_cb(ev_loop *loop, struct ev_async *watcher,
 void
 cpipe_create(struct cpipe *pipe)
 {
-	STAILQ_INIT(&pipe->pipe);
-	STAILQ_INIT(&pipe->input);
-	STAILQ_INIT(&pipe->output);
+	stailq_create(&pipe->pipe);
+	stailq_create(&pipe->input);
+	stailq_create(&pipe->output);
 
 	pipe->n_input = 0;
 	pipe->max_input = INT_MAX;
@@ -179,19 +179,19 @@ cbus_flush_cb(ev_loop *loop, struct ev_async *watcher,
 
 	/* Trigger task processing when the queue becomes non-empty. */
 	bool pipe_was_empty;
-	bool peer_output_was_empty = STAILQ_EMPTY(&peer->output);
+	bool peer_output_was_empty = stailq_empty(&peer->output);
 
 	cbus_lock(pipe->bus);
 	pipe_was_empty = !ev_async_pending(&pipe->fetch_output);
 	/** Flush input */
-	STAILQ_CONCAT(&pipe->pipe, &pipe->input);
+	stailq_concat(&pipe->pipe, &pipe->input);
 	/*
 	 * While at it, pop output.
 	 * The consumer of the output of the bound queue is the
 	 * same as the producer of input, so we can safely access it.
 	 * We can safely access queue because it's locked.
 	 */
-	STAILQ_CONCAT(&peer->output, &peer->pipe);
+	stailq_concat(&peer->output, &peer->pipe);
 	cbus_unlock(pipe->bus);
 
 
@@ -202,14 +202,14 @@ cbus_flush_cb(ev_loop *loop, struct ev_async *watcher,
 
 		ev_async_send(pipe->consumer, &pipe->fetch_output);
 	}
-	if (peer_output_was_empty && !STAILQ_EMPTY(&peer->output))
+	if (peer_output_was_empty && !stailq_empty(&peer->output))
 		ev_feed_event(peer->consumer, &peer->fetch_output, EV_CUSTOM);
 }
 
 struct cmsg *
 cpipe_peek_impl(struct cpipe *pipe)
 {
-	assert(STAILQ_EMPTY(&pipe->output));
+	assert(stailq_empty(&pipe->output));
 
 	struct cpipe *peer = pipe->peer;
 	assert(pipe->consumer == loop());
@@ -219,10 +219,10 @@ cpipe_peek_impl(struct cpipe *pipe)
 
 
 	cbus_lock(pipe->bus);
-	STAILQ_CONCAT(&pipe->output, &pipe->pipe);
-	if (! STAILQ_EMPTY(&peer->input)) {
+	stailq_concat(&pipe->output, &pipe->pipe);
+	if (! stailq_empty(&peer->input)) {
 		peer_pipe_was_empty = !ev_async_pending(&peer->fetch_output);
-		STAILQ_CONCAT(&peer->pipe, &peer->input);
+		stailq_concat(&peer->pipe, &peer->input);
 	}
 	cbus_unlock(pipe->bus);
 	peer->n_input = 0;
@@ -233,7 +233,7 @@ cpipe_peek_impl(struct cpipe *pipe)
 
 		ev_async_send(peer->consumer, &peer->fetch_output);
 	}
-	return STAILQ_FIRST(&pipe->output);
+	return stailq_first_entry(&pipe->output, struct cmsg, fifo);
 }
 
 
@@ -301,7 +301,7 @@ cpipe_fiber_pool_cb(ev_loop *loop, struct ev_async *watcher,
 		(struct cpipe_fiber_pool *) watcher->data;
 	struct cpipe *pipe = pool->pipe;
 	(void) cpipe_peek(pipe);
-	while (! STAILQ_EMPTY(&pipe->output)) {
+	while (! stailq_empty(&pipe->output)) {
 		struct fiber *f;
 		if (! rlist_empty(&pool->fiber_cache)) {
 			f = rlist_shift_entry(&pool->fiber_cache,
diff --git a/src/cbus.h b/src/cbus.h
index fb27140b63..cf4286a1a7 100644
--- a/src/cbus.h
+++ b/src/cbus.h
@@ -32,7 +32,7 @@
  */
 #include "fiber.h"
 #include "rmean.h"
-#include "third_party/queue.h"
+#include "salad/stailq.h"
 
 #if defined(__cplusplus)
 extern "C" {
@@ -87,7 +87,7 @@ struct cmsg {
 	 * message is stuck in currently, waiting to get
 	 * delivered.
 	 */
-	STAILQ_ENTRY(cmsg) fifo;
+	struct stailq_entry fifo;
 	/** The message routing path. */
 	struct cmsg_hop *route;
 	/** The current hop the message is at. */
@@ -105,8 +105,6 @@ cmsg_init(struct cmsg *msg, struct cmsg_hop *route)
 	msg->hop = msg->route = route;
 }
 
-STAILQ_HEAD(cmsg_fifo, cmsg);
-
 #define CACHELINE_SIZE 64
 /** A  uni-directional FIFO queue from one cord to another. */
 struct cpipe {
@@ -121,7 +119,7 @@ struct cpipe {
 	 *     output      <-- owned by the producer thread
 	 */
 	struct {
-		struct cmsg_fifo pipe;
+		struct stailq pipe;
 		/** Peer pipe - the other direction of the bus. */
 		struct cpipe *peer;
 		struct cbus *bus;
@@ -129,7 +127,7 @@ struct cpipe {
 	/** Stuff most actively used in the producer thread. */
 	struct {
 		/** Staging area for pushed messages */
-		struct cmsg_fifo input;
+		struct stailq input;
 		/** Counters are useful for finer-grained scheduling. */
 		int n_input;
 		/**
@@ -152,7 +150,7 @@ struct cpipe {
 	/** Stuff related to the consumer. */
 	struct {
 		/** Staged messages (for pop) */
-		struct cmsg_fifo output;
+		struct stailq output;
 		/**
 		 * Used to trigger task processing when
 		 * the pipe becomes non-empty.
@@ -214,11 +212,9 @@ cpipe_pop_output(struct cpipe *pipe)
 {
 	assert(loop() == pipe->consumer);
 
-	if (STAILQ_EMPTY(&pipe->output))
+	if (stailq_empty(&pipe->output))
 		return NULL;
-	struct cmsg *msg = STAILQ_FIRST(&pipe->output);
-	STAILQ_REMOVE_HEAD(&pipe->output, fifo);
-	return msg;
+	return stailq_shift_entry(&pipe->output, struct cmsg, fifo);
 }
 
 struct cmsg *
@@ -233,10 +229,10 @@ cpipe_peek(struct cpipe *pipe)
 {
 	assert(loop() == pipe->consumer);
 
-	if (STAILQ_EMPTY(&pipe->output))
+	if (stailq_empty(&pipe->output))
 		return cpipe_peek_impl(pipe);
 
-	return STAILQ_FIRST(&pipe->output);
+	return stailq_first_entry(&pipe->output, struct cmsg, fifo);
 }
 
 /**
@@ -310,7 +306,7 @@ cpipe_push_input(struct cpipe *pipe, struct cmsg *msg)
 {
 	assert(loop() == pipe->producer);
 
-	STAILQ_INSERT_TAIL(&pipe->input, msg, fifo);
+	stailq_add_tail_entry(&pipe->input, msg, fifo);
 	pipe->n_input++;
 	if (pipe->n_input >= pipe->max_input)
 		ev_invoke(pipe->producer, &pipe->flush_input, EV_CUSTOM);
diff --git a/test/unit/stailq.result b/test/unit/stailq.result
index 09225c0adf..debebf52db 100644
--- a/test/unit/stailq.result
+++ b/test/unit/stailq.result
@@ -1,4 +1,4 @@
-1..29
+1..37
 ok 1 - list is empty
 ok 2 - list is empty after reverse
 ok 3 - first item
@@ -13,18 +13,26 @@ ok 11 - element (foreach) 6
 ok 12 - first item
 ok 13 - head is not empty
 ok 14 - first entry
-ok 15 - next is empty
-ok 16 - element (foreach_entry) 6
-ok 17 - element (foreach_entry) 5
-ok 18 - element (foreach_entry) 4
-ok 19 - element (foreach_entry) 3
-ok 20 - element (foreach_entry) 2
-ok 21 - element (foreach_entry) 1
-ok 22 - element (foreach_entry) 0
-ok 23 - element (foreach_entry) 0
-ok 24 - element (foreach_entry) 1
-ok 25 - element (foreach_entry) 2
-ok 26 - element (foreach_entry) 3
-ok 27 - element (foreach_entry) 4
-ok 28 - element (foreach_entry) 5
-ok 29 - element (foreach_entry) 6
+ok 15 - shift item 0
+ok 16 - shift item 1
+ok 17 - shift item 2
+ok 18 - shift item 3
+ok 19 - shift item 4
+ok 20 - shift item 5
+ok 21 - shift item 6
+ok 22 - list is empty after shift
+ok 23 - next is empty
+ok 24 - element (foreach_entry) 6
+ok 25 - element (foreach_entry) 5
+ok 26 - element (foreach_entry) 4
+ok 27 - element (foreach_entry) 3
+ok 28 - element (foreach_entry) 2
+ok 29 - element (foreach_entry) 1
+ok 30 - element (foreach_entry) 0
+ok 31 - element (foreach_entry) 0
+ok 32 - element (foreach_entry) 1
+ok 33 - element (foreach_entry) 2
+ok 34 - element (foreach_entry) 3
+ok 35 - element (foreach_entry) 4
+ok 36 - element (foreach_entry) 5
+ok 37 - element (foreach_entry) 6
-- 
GitLab