diff --git a/src/box/iproto.cc b/src/box/iproto.cc index 52a0c135140c54a4d1ff3a93dff1045e4b24653f..ba0d1f76e01b951246783a56dafb84adc00fc1c8 100644 --- a/src/box/iproto.cc +++ b/src/box/iproto.cc @@ -945,7 +945,7 @@ iproto_connection_close(struct iproto_connection *con) * parsed data is processed. It's important this * is done only once. */ - con->p_ibuf->wpos -= con->parse_size; + ibuf_discard(con->p_ibuf, con->parse_size); mh_int_t node; mh_foreach(con->streams, node) { struct iproto_stream *stream = (struct iproto_stream *) @@ -1052,15 +1052,15 @@ iproto_connection_input_buffer(struct iproto_connection *con) } xibuf_reserve(new_ibuf, to_read + con->parse_size); - /* - * Discard unparsed data in the old buffer, otherwise it - * won't be recycled when all parsed requests are processed. - */ - old_ibuf->wpos -= con->parse_size; if (con->parse_size != 0) { /* Move the cached request prefix to the new buffer. */ - memcpy(new_ibuf->rpos, old_ibuf->wpos, con->parse_size); - new_ibuf->wpos += con->parse_size; + void *wpos = ibuf_alloc(new_ibuf, con->parse_size); + memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size); + /* + * Discard unparsed data in the old buffer, otherwise it + * won't be recycled when all parsed requests are processed. + */ + ibuf_discard(old_ibuf, con->parse_size); /* * We made ibuf idle. If obuf was already idle it * makes the both ibuf and obuf idle, time to trim diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c index bbe8ca3d02ac6ba3b67d013e5553f0d8f16531e7..99c5b31b1a3d997fa58f9f355c6536077d47a990 100644 --- a/src/box/lua/merger.c +++ b/src/box/lua/merger.c @@ -115,8 +115,8 @@ decode_header(struct ibuf *buf, size_t *len_p) static void encode_header(struct ibuf *output_buffer, uint32_t result_len) { - ibuf_reserve(output_buffer, mp_sizeof_array(result_len)); - output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len); + void *wpos = xibuf_alloc(output_buffer, mp_sizeof_array(result_len)); + mp_encode_array(wpos, result_len); } /** @@ -596,10 +596,10 @@ luaL_merge_source_buffer_next(struct merge_source *base, return -1; } --source->remaining_tuple_count; - source->buf->rpos = (char *) tuple_end; if (format == NULL) format = tuple_format_runtime; struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end); + ibuf_consume_before(source->buf, tuple_end); if (tuple == NULL) return -1; @@ -1140,9 +1140,8 @@ encode_result_buffer(struct lua_State *L, struct merge_source *source, merge_source_next(source, NULL, &tuple)) == 0 && tuple != NULL) { uint32_t bsize = tuple_bsize(tuple); - ibuf_reserve(output_buffer, bsize); - memcpy(output_buffer->wpos, tuple_data(tuple), bsize); - output_buffer->wpos += bsize; + void *wpos = xibuf_alloc(output_buffer, bsize); + memcpy(wpos, tuple_data(tuple), bsize); result_len_offset += bsize; ++result_len; diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c index 7cba024bfd6c14cbefa0fa3fd94bb602ec29f6d9..8e14945356591a0702036a41e323b84540f26df4 100644 --- a/src/box/lua/net_box.c +++ b/src/box/lua/net_box.c @@ -270,6 +270,8 @@ struct netbox_transport { struct ibuf send_buf; /** Connection receive buffer. */ struct ibuf recv_buf; + /** Size of the last received message. */ + size_t last_msg_size; /** Signalled when send_buf becomes empty. */ struct fiber_cond on_send_buf_empty; /** Next request id. */ @@ -523,6 +525,7 @@ netbox_transport_create(struct netbox_transport *transport) iostream_clear(&transport->io); ibuf_create(&transport->send_buf, &cord()->slabc, NETBOX_READAHEAD); ibuf_create(&transport->recv_buf, &cord()->slabc, NETBOX_READAHEAD); + transport->last_msg_size = 0; fiber_cond_create(&transport->on_send_buf_empty); transport->next_sync = 1; transport->requests = mh_i64ptr_new(); @@ -580,6 +583,7 @@ netbox_transport_set_error(struct netbox_transport *transport) /* Reset buffers. */ ibuf_reinit(&transport->send_buf); ibuf_reinit(&transport->recv_buf); + transport->last_msg_size = 0; fiber_cond_broadcast(&transport->on_send_buf_empty); /* Complete requests and clean up the hash. */ struct mh_i64ptr_t *h = transport->requests; @@ -1142,7 +1146,7 @@ netbox_transport_communicate(struct netbox_transport *transport, size_t limit) "Peer closed"); return -1; } if (rc > 0) { - recv_buf->wpos += rc; + VERIFY(ibuf_alloc(recv_buf, rc) != NULL); } else if (rc == IOSTREAM_ERROR) { goto io_error; } else { @@ -1156,7 +1160,7 @@ netbox_transport_communicate(struct netbox_transport *transport, size_t limit) ssize_t rc = iostream_write(io, send_buf->rpos, ibuf_used(send_buf)); if (rc >= 0) { - send_buf->rpos += rc; + ibuf_consume(send_buf, rc); if (ibuf_used(send_buf) == 0) fiber_cond_broadcast(on_send_buf_empty); } else if (rc == IOSTREAM_ERROR) { @@ -1193,6 +1197,7 @@ static int netbox_transport_send_and_recv(struct netbox_transport *transport, struct xrow_header *hdr) { + ibuf_consume(&transport->recv_buf, transport->last_msg_size); while (true) { size_t required; size_t data_len = ibuf_used(&transport->recv_buf); @@ -1212,10 +1217,11 @@ netbox_transport_send_and_recv(struct netbox_transport *transport, required = size + len; if (data_len >= required) { const char *body_end = rpos + len; - transport->recv_buf.rpos = (char *)body_end; - return xrow_header_decode( - hdr, &rpos, body_end, - /*end_is_exact=*/true); + int rc = xrow_header_decode( + hdr, &rpos, body_end, + /*end_is_exact=*/true); + transport->last_msg_size = body_end - bufpos; + return rc; } } if (netbox_transport_communicate(transport, required) != 0) diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua index 646cad25f0a01fe27ac5992e6441393cb1a6bf15..06644b3825490ab2773a732c2567afc5db78ca7d 100644 --- a/src/box/lua/schema.lua +++ b/src/box/lua/schema.lua @@ -1941,7 +1941,7 @@ local function iterator_pos_set(index, pos, ibuf) iterator_pos_end[0] = iterator_pos[0] + #pos return true else - ibuf.rpos = ibuf.wpos + ibuf:consume(ibuf.wpos - ibuf.rpos) local tuple, tuple_end = tuple_encode(ibuf, pos) return builtin.box_index_tuple_position( index.space_id, index.id, tuple, tuple_end, diff --git a/src/box/lua/tuple.lua b/src/box/lua/tuple.lua index a38a4f84ef509e3ede17629d103e69b6209077e0..584fe32ab1a69a8b177e49bcd12cdc583f5b4066 100644 --- a/src/box/lua/tuple.lua +++ b/src/box/lua/tuple.lua @@ -304,9 +304,7 @@ end local function tuple_to_msgpack(buf, tuple) assert(ffi.istype(tuple_t, tuple)) local bsize = builtin.box_tuple_bsize(tuple) - buf:reserve(bsize) - builtin.box_tuple_to_buf(tuple, buf.wpos, bsize) - buf.wpos = buf.wpos + bsize + builtin.box_tuple_to_buf(tuple, buf:alloc(bsize), bsize) end local function tuple_bsize(tuple) diff --git a/src/httpc.c b/src/httpc.c index 025346e149bfdf33357c4381d04ab95fdc3c30c4..14880200d1feeb1abd2210802505c7a1801abb97 100644 --- a/src/httpc.c +++ b/src/httpc.c @@ -68,7 +68,7 @@ curl_easy_io_read_cb(char *buffer, size_t size, size_t nitems, void *ctx) size_t read_len = ibuf_len < buffer_size ? ibuf_len : buffer_size; memcpy(buffer, req->send.rpos, read_len); - req->send.rpos += read_len; + ibuf_consume(&req->send, read_len); fiber_cond_broadcast(&req->io_send_cond); return read_len; @@ -541,7 +541,7 @@ httpc_request_io_read(struct httpc_request *req, char *buf, size_t len, if (copied == ibuf_len) ibuf_reset(&req->io_recv); else - req->io_recv.rpos += copied; + ibuf_consume(&req->io_recv, copied); } if (copied < len && recv_len > 0) { diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h index 0d90a483c06d7d73aa6ee93a123c087651fcf702..110cc700c5b598bf722f3354f7113266bf2602d7 100644 --- a/src/lib/core/coio_buf.h +++ b/src/lib/core/coio_buf.h @@ -50,7 +50,7 @@ coio_bread(struct iostream *io, struct ibuf *buf, size_t sz) ssize_t n = coio_read_ahead(io, buf->wpos, sz, ibuf_unused(buf)); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -68,7 +68,7 @@ coio_bread_timeout(struct iostream *io, struct ibuf *buf, size_t sz, timeout); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -80,7 +80,7 @@ coio_breadn(struct iostream *io, struct ibuf *buf, size_t sz) ssize_t n = coio_readn_ahead(io, buf->wpos, sz, ibuf_unused(buf)); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } @@ -99,7 +99,7 @@ coio_breadn_timeout(struct iostream *io, struct ibuf *buf, size_t sz, timeout); if (n < 0) diag_raise(); - buf->wpos += n; + VERIFY(ibuf_alloc(buf, n) != NULL); return n; } diff --git a/src/lib/core/prbuf.c b/src/lib/core/prbuf.c index e60dc7d04bb59c42cc71ca83f816d2da8e097ca2..da7d49a18e8714a8252bcd2e014dda5b89953b89 100644 --- a/src/lib/core/prbuf.c +++ b/src/lib/core/prbuf.c @@ -427,6 +427,7 @@ prbuf_reader_wrap(struct prbuf_reader *reader) ibuf_reset(&reader->buf); reader->pos = reader->data_begin; reader->read_pos = reader->pos; + reader->last_read_size = 0; } int @@ -445,6 +446,8 @@ prbuf_reader_next(struct prbuf_reader *reader, return 0; } + /* Consume the record data of the previous read. */ + ibuf_consume(&reader->buf, reader->last_read_size); /* Check if we hit end of buffer and need to wrap around. */ if (reader->data_end - reader->pos < (off_t)record_size_overhead) prbuf_reader_wrap(reader); @@ -472,14 +475,14 @@ prbuf_reader_next(struct prbuf_reader *reader, } /* Read record data. */ - reader->buf.rpos += record_size_overhead; + ibuf_consume(&reader->buf, record_size_overhead); if (prbuf_reader_ensure(reader, sz) != 0) return -1; entry->ptr = reader->buf.rpos; entry->size = sz; + reader->last_read_size = sz; reader->pos += full_sz; - reader->buf.rpos += sz; reader->unread_size -= full_sz; return 0; } diff --git a/src/lib/core/prbuf.h b/src/lib/core/prbuf.h index ab5303bb0d9a9412bdad072a9a93f116ded2805a..c06426984ddfb3ad87641eee86184d420091427f 100644 --- a/src/lib/core/prbuf.h +++ b/src/lib/core/prbuf.h @@ -94,6 +94,10 @@ struct prbuf_reader { * If it is 0 then we read all the data. */ size_t unread_size; + /** + * Number of bytes in the last record read by client. + */ + size_t last_read_size; /** * File offset of the beginning of the data area. * Data area is the area after header till the end of the buffer. @@ -122,6 +126,9 @@ prbuf_reader_create(struct prbuf_reader *reader, int fd, off_t offset); * Read the next record into entry argument. If there are no more records * then returned record will be a terminator (EOF, ptr == NULL && size == 0). * + * Pointer to data on successful read is valid until next call to this + * function. + * * After EOF the function can be called again and will return EOF. * * After failure reader is invalid and can only be closed. diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua index c34298a3dc4d5fffc920b6ca3485ec0c425abd19..7d6ad9952415bfe858edafbe74c9ffa63c92d969 100644 --- a/src/lua/buffer.lua +++ b/src/lua/buffer.lua @@ -154,6 +154,13 @@ local function ibuf_read(buf, size) return rpos end +local function ibuf_consume(buf, size) + checkibuf(buf, 'consume') + checksize(buf, size) + utils.poison_memory_region(buf.rpos, size); + buf.rpos = buf.rpos + size +end + local function ibuf_serialize(buf) local properties = { rpos = buf.rpos, wpos = buf.wpos } return { ibuf = properties } @@ -168,6 +175,7 @@ local ibuf_methods = { checksize = ibuf_checksize; read = ibuf_read; + consume = ibuf_consume; __serialize = ibuf_serialize; size = ibuf_used; diff --git a/src/lua/httpc.lua b/src/lua/httpc.lua index e694f1fc1db4e7d48e2a740fce11ce91258b24b3..a8f058bf6acb41ec0df033d9c96cec42845b90a7 100644 --- a/src/lua/httpc.lua +++ b/src/lua/httpc.lua @@ -424,7 +424,7 @@ local function io_read(self, opts, timeout) local len = check(self, chunk, delimiter) if len ~= nil then local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end @@ -438,15 +438,15 @@ local function io_read(self, opts, timeout) self._errno = nil local len = rbuf:size() local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data else - rbuf.wpos = rbuf.wpos + res + rbuf:alloc(res) local len = check(self, chunk, delimiter) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end end diff --git a/src/lua/socket.lua b/src/lua/socket.lua index 28581480497bde2d590a782e3f2de014239e099b..2b6b59759d73c467827582b73f23a07fa8ee952d 100644 --- a/src/lua/socket.lua +++ b/src/lua/socket.lua @@ -705,7 +705,7 @@ local function read(self, limit, timeout, check, ...) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end @@ -720,15 +720,15 @@ local function read(self, limit, timeout, check, ...) self._errno = nil local len = rbuf:size() local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data elseif res ~= nil then - rbuf.wpos = rbuf.wpos + res + rbuf:alloc(res) local len = check(self, limit, ...) if len ~= nil then self._errno = nil local data = ffi.string(rbuf.rpos, len) - rbuf.rpos = rbuf.rpos + len + rbuf:consume(len) return data end elseif not errno_is_transient[self._errno] then