Skip to content
Snippets Groups Projects
Commit fafa3465 authored by Konstantin Osipov's avatar Konstantin Osipov
Browse files

Fix two bugs in the new async I/O.

There were two cases when the new async I/O worked incorrectly.
The problems only revealed themselves under specific
concurrency circumstances.

In one case, the client would get a mix of pieces of different
packets (responses).

The reason for this was that, when selecting which output buffer
to write to the client, the flushing algorithm used to select
whichever of two buffers, associated with a session, had data to
flush.
This worked OK as long as the entire contents of the buffer could
be written in one writev() call. But when writev() would only be
able to write a part of the buffer, the next writev() could
already be done with another buffer, thus producing a mix of
*partial* writes from both buffers.

Between two buffers used for a session, one is always newer than
the other (contains responses to requests which came in earlier).
So basically the bug was that we would select a newer
buffer for flushing, while there were still unhandled requests
in the older buffer.

One approach to a fix would be to, once started, keep writing
the newer buffer until there is nothing to write in it.

The actual fix chooses a simpler approach: to never begin
with a newer buffer, until the older one is completely written.
Thus, there is never reordering of output between two
output buffers, only within a single buffer.

The second bug was caused by wrong calculation of write offset
for iproto header. When iproto_header would fall on the
border of two iov vectors, obuf_book would rightfully
discard the tail of the first iov vector, and "position"
the header at the beginning of the second vector.
However, the savepoint used to memcpy() the output would
still point at the first vector, and the header would be
(partially) written at the end of the first vector,
not at the beginning of of the second.
This led to a) write beyond allocated memory b) loss
of a piece of a packet (effect seen by the client).

The fix is to correctly record the write position when
making a booking. obuf_book signature is changed
respectively.

No test cases since the bug is only spottable under certain
concurrency circumstances (a single CPU box, client/server I/O).
@todo: add unit tests.
parent 9ca7fda1
No related branches found
No related tags found
No related merge requests found
......@@ -136,39 +136,39 @@ obuf_iovcnt(struct obuf *buf)
return buf->iov[buf->pos].iov_len > 0 ? buf->pos + 1 : buf->pos;
}
/**
* Output buffer savepoint. It's possible to
* save the current buffer state in a savepoint
* and roll back to the saved state at any time
* before iobuf_flush()
*/
struct obuf_svp
{
size_t pos;
size_t iov_len;
size_t size;
};
/**
* Reserve size bytes in the output buffer
* and return a pointer to the reserved
* data. Returns a pointer to a continuous piece of
* memory.
* Typical use case:
* void *psize = obuf_book(buf, sizeof(uint32_t));
* struct obuf_svp svp = obuf_book(buf, sizeof(uint32_t));
* for (...)
* obuf_dup(buf, ...);
* uint32_t total = obuf_size(buf);
* memcpy(psize, &total, sizeof(total);
* memcpy(obuf_svp_to_ptr(&svp), &total, sizeof(total);
* iobuf_flush();
*/
void *
struct obuf_svp
obuf_book(struct obuf *obuf, size_t size);
/** Append data to the output buffer. */
void
obuf_dup(struct obuf *obuf, void *data, size_t size);
/**
* Output buffer savepoint. It's possible to
* save the current buffer state in a savepoint
* and roll back to the saved state at any time
* before iobuf_flush()
*/
struct obuf_svp
{
size_t pos;
size_t iov_len;
size_t size;
};
static inline struct obuf_svp
obuf_create_svp(struct obuf *buf)
{
......
......@@ -200,7 +200,7 @@ obuf_dup(struct obuf *buf, void *data, size_t size)
}
/** Book a few bytes in the output buffer. */
void *
struct obuf_svp
obuf_book(struct obuf *buf, size_t size)
{
struct iovec *iov = &buf->iov[buf->pos];
......@@ -221,11 +221,13 @@ obuf_book(struct obuf *buf, size_t size)
obuf_alloc_pos(buf, buf->pos, size);
}
}
void *booking = iov->iov_base + iov->iov_len;
struct obuf_svp svp = {
.pos = buf->pos, .iov_len = iov->iov_len, .size = buf->size
};
iov->iov_len += size;
buf->size += size;
assert(iov->iov_len <= buf->capacity[buf->pos]);
return booking;
return svp;
}
/** Forget about data in the output buffer beyond the savepoint. */
......
......@@ -138,8 +138,7 @@ port_iproto_add_tuple(struct port *ptr, struct tuple *tuple, u32 flags)
struct port_iproto *port = port_iproto(ptr);
if (++port->reply.found == 1) {
/* Found the first tuple, add header. */
port->svp = obuf_create_svp(port->buf);
obuf_book(port->buf, sizeof(port->reply));
port->svp = obuf_book(port->buf, sizeof(port->reply));
}
if (flags & BOX_RETURN_TUPLE) {
obuf_dup(port->buf, &tuple->bsize, tuple_len(tuple));
......@@ -635,7 +634,14 @@ iproto_session_output_iobuf(struct iproto_session *session)
{
if (obuf_size(&session->iobuf[1]->out))
return session->iobuf[1];
if (obuf_size(&session->iobuf[0]->out))
/*
* Don't try to write from a newer buffer if an older one
* exists: in case of a partial write of a newer buffer,
* the client may end up getting a salad of different
* pieces of replies from both buffers.
*/
if (ibuf_size(&session->iobuf[1]->in) == 0 &&
obuf_size(&session->iobuf[0]->out))
return session->iobuf[0];
return NULL;
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment