From fafa3465549b0d0c0f963f0b5060409b06fdd420 Mon Sep 17 00:00:00 2001
From: Konstantin Osipov <kostja@tarantool.org>
Date: Sat, 9 Feb 2013 13:25:29 +0400
Subject: [PATCH] Fix two bugs in the new async I/O.

There were two cases when the new async I/O worked incorrectly.
The problems only revealed themselves under specific
concurrency circumstances.

In one case, the client would get a mix of pieces of different
packets (responses).

The reason for this was that, when selecting which output buffer
to write to the client, the flushing algorithm used to select
whichever of two buffers, associated with a session, had data to
flush.
This worked OK as long as the entire contents of the buffer could
be written in one writev() call. But when writev() would only be
able to write a part of the buffer, the next writev() could
already be done with another buffer, thus producing a mix of
*partial* writes from both buffers.

Between two buffers used for a session, one is always newer than
the other (contains responses to requests which came in earlier).
So basically the bug was that we would select a newer
buffer for flushing, while there were still unhandled requests
in the older buffer.

One approach to a fix would be to, once started, keep writing
the newer buffer until there is nothing to write in it.

The actual fix chooses a simpler approach: to never begin
with a newer buffer, until the older one is completely written.
Thus, there is never reordering of output between two
output buffers, only within a single buffer.

The second bug was caused by wrong calculation of write offset
for iproto header. When iproto_header would fall on the
border of two iov vectors, obuf_book would rightfully
discard the tail of the first iov vector, and "position"
the header at the beginning of the second vector.
However, the savepoint used to memcpy() the output would
still point at the first vector, and the header would be
(partially) written at the end of the first vector,
not at the beginning of of the second.
This led to a) write beyond allocated memory b) loss
of a piece of a packet (effect seen by the client).

The fix is to correctly record the write position when
making a booking. obuf_book signature is changed
respectively.

No test cases since the bug is only spottable under certain
concurrency circumstances (a single CPU box, client/server I/O).
@todo: add unit tests.
---
 include/iobuf.h | 32 ++++++++++++++++----------------
 src/iobuf.m     |  8 +++++---
 src/iproto.m    | 12 +++++++++---
 3 files changed, 30 insertions(+), 22 deletions(-)

diff --git a/include/iobuf.h b/include/iobuf.h
index deaf0ffaf2..60ea3fb88d 100644
--- a/include/iobuf.h
+++ b/include/iobuf.h
@@ -136,39 +136,39 @@ obuf_iovcnt(struct obuf *buf)
 	return buf->iov[buf->pos].iov_len > 0 ? buf->pos + 1 : buf->pos;
 }
 
+/**
+ * Output buffer savepoint. It's possible to
+ * save the current buffer state in a savepoint
+ * and roll back to the saved state at any time
+ * before iobuf_flush()
+ */
+struct obuf_svp
+{
+	size_t pos;
+	size_t iov_len;
+	size_t size;
+};
+
 /**
  * Reserve size bytes in the output buffer
  * and return a pointer to the reserved
  * data. Returns a pointer to a continuous piece of
  * memory.
  * Typical use case:
- * void *psize = obuf_book(buf, sizeof(uint32_t));
+ * struct obuf_svp svp = obuf_book(buf, sizeof(uint32_t));
  * for (...)
  *	obuf_dup(buf, ...);
  * uint32_t total = obuf_size(buf);
- * memcpy(psize, &total, sizeof(total);
+ * memcpy(obuf_svp_to_ptr(&svp), &total, sizeof(total);
  * iobuf_flush();
  */
-void *
+struct obuf_svp
 obuf_book(struct obuf *obuf, size_t size);
 
 /** Append data to the output buffer. */
 void
 obuf_dup(struct obuf *obuf, void *data, size_t size);
 
-/**
- * Output buffer savepoint. It's possible to
- * save the current buffer state in a savepoint
- * and roll back to the saved state at any time
- * before iobuf_flush()
- */
-struct obuf_svp
-{
-	size_t pos;
-	size_t iov_len;
-	size_t size;
-};
-
 static inline struct obuf_svp
 obuf_create_svp(struct obuf *buf)
 {
diff --git a/src/iobuf.m b/src/iobuf.m
index d230a0f735..a54b722d7c 100644
--- a/src/iobuf.m
+++ b/src/iobuf.m
@@ -200,7 +200,7 @@ obuf_dup(struct obuf *buf, void *data, size_t size)
 }
 
 /** Book a few bytes in the output buffer. */
-void *
+struct obuf_svp
 obuf_book(struct obuf *buf, size_t size)
 {
 	struct iovec *iov = &buf->iov[buf->pos];
@@ -221,11 +221,13 @@ obuf_book(struct obuf *buf, size_t size)
 			obuf_alloc_pos(buf, buf->pos, size);
 		}
 	}
-	void *booking = iov->iov_base + iov->iov_len;
+	struct obuf_svp svp = {
+		.pos = buf->pos, .iov_len = iov->iov_len, .size = buf->size
+	};
 	iov->iov_len += size;
 	buf->size += size;
 	assert(iov->iov_len <= buf->capacity[buf->pos]);
-	return booking;
+	return svp;
 }
 
 /** Forget about data in the output buffer beyond the savepoint. */
diff --git a/src/iproto.m b/src/iproto.m
index d3fa59411c..4e859b29cb 100644
--- a/src/iproto.m
+++ b/src/iproto.m
@@ -138,8 +138,7 @@ port_iproto_add_tuple(struct port *ptr, struct tuple *tuple, u32 flags)
 	struct port_iproto *port = port_iproto(ptr);
 	if (++port->reply.found == 1) {
 		/* Found the first tuple, add header. */
-		port->svp = obuf_create_svp(port->buf);
-		obuf_book(port->buf, sizeof(port->reply));
+		port->svp = obuf_book(port->buf, sizeof(port->reply));
 	}
 	if (flags & BOX_RETURN_TUPLE) {
 		obuf_dup(port->buf, &tuple->bsize, tuple_len(tuple));
@@ -635,7 +634,14 @@ iproto_session_output_iobuf(struct iproto_session *session)
 {
 	if (obuf_size(&session->iobuf[1]->out))
 		return session->iobuf[1];
-	if (obuf_size(&session->iobuf[0]->out))
+	/*
+	 * Don't try to write from a newer buffer if an older one
+	 * exists: in case of a partial write of a newer buffer,
+	 * the client may end up getting a salad of different
+	 * pieces of replies from both buffers.
+	 */
+	if (ibuf_size(&session->iobuf[1]->in) == 0 &&
+	    obuf_size(&session->iobuf[0]->out))
 		return session->iobuf[0];
 	return NULL;
 }
-- 
GitLab