From 7383266879c3c3842c6ff8105b66ae079cb70b45 Mon Sep 17 00:00:00 2001
From: Nikolay Shirokovskiy <nshirokovskiy@tarantool.org>
Date: Mon, 16 Oct 2023 10:19:01 +0300
Subject: [PATCH] misc: use ibuf API to discard/allocate/consume

The API functions additionally poison related data in ASAN build.

Follow-up #7327

NO_TEST=refactoring
NO_CHANGELOG=refactoring
NO_DOC=refactoring
---
 src/box/iproto.cc       | 16 ++++++++--------
 src/box/lua/merger.c    | 11 +++++------
 src/box/lua/net_box.c   | 18 ++++++++++++------
 src/box/lua/schema.lua  |  2 +-
 src/box/lua/tuple.lua   |  4 +---
 src/httpc.c             |  4 ++--
 src/lib/core/coio_buf.h |  8 ++++----
 src/lib/core/prbuf.c    |  7 +++++--
 src/lib/core/prbuf.h    |  7 +++++++
 src/lua/buffer.lua      |  8 ++++++++
 src/lua/httpc.lua       |  8 ++++----
 src/lua/socket.lua      |  8 ++++----
 12 files changed, 61 insertions(+), 40 deletions(-)

diff --git a/src/box/iproto.cc b/src/box/iproto.cc
index 52a0c13514..ba0d1f76e0 100644
--- a/src/box/iproto.cc
+++ b/src/box/iproto.cc
@@ -945,7 +945,7 @@ iproto_connection_close(struct iproto_connection *con)
 		 * parsed data is processed.  It's important this
 		 * is done only once.
 		 */
-		con->p_ibuf->wpos -= con->parse_size;
+		ibuf_discard(con->p_ibuf, con->parse_size);
 		mh_int_t node;
 		mh_foreach(con->streams, node) {
 			struct iproto_stream *stream = (struct iproto_stream *)
@@ -1052,15 +1052,15 @@ iproto_connection_input_buffer(struct iproto_connection *con)
 	}
 
 	xibuf_reserve(new_ibuf, to_read + con->parse_size);
-	/*
-	 * Discard unparsed data in the old buffer, otherwise it
-	 * won't be recycled when all parsed requests are processed.
-	 */
-	old_ibuf->wpos -= con->parse_size;
 	if (con->parse_size != 0) {
 		/* Move the cached request prefix to the new buffer. */
-		memcpy(new_ibuf->rpos, old_ibuf->wpos, con->parse_size);
-		new_ibuf->wpos += con->parse_size;
+		void *wpos = ibuf_alloc(new_ibuf, con->parse_size);
+		memcpy(wpos, old_ibuf->wpos - con->parse_size, con->parse_size);
+		/*
+		 * Discard unparsed data in the old buffer, otherwise it
+		 * won't be recycled when all parsed requests are processed.
+		 */
+		ibuf_discard(old_ibuf, con->parse_size);
 		/*
 		 * We made ibuf idle. If obuf was already idle it
 		 * makes the both ibuf and obuf idle, time to trim
diff --git a/src/box/lua/merger.c b/src/box/lua/merger.c
index bbe8ca3d02..99c5b31b1a 100644
--- a/src/box/lua/merger.c
+++ b/src/box/lua/merger.c
@@ -115,8 +115,8 @@ decode_header(struct ibuf *buf, size_t *len_p)
 static void
 encode_header(struct ibuf *output_buffer, uint32_t result_len)
 {
-	ibuf_reserve(output_buffer, mp_sizeof_array(result_len));
-	output_buffer->wpos = mp_encode_array(output_buffer->wpos, result_len);
+	void *wpos = xibuf_alloc(output_buffer, mp_sizeof_array(result_len));
+	mp_encode_array(wpos, result_len);
 }
 
 /**
@@ -596,10 +596,10 @@ luaL_merge_source_buffer_next(struct merge_source *base,
 		return -1;
 	}
 	--source->remaining_tuple_count;
-	source->buf->rpos = (char *) tuple_end;
 	if (format == NULL)
 		format = tuple_format_runtime;
 	struct tuple *tuple = tuple_new(format, tuple_beg, tuple_end);
+	ibuf_consume_before(source->buf, tuple_end);
 	if (tuple == NULL)
 		return -1;
 
@@ -1140,9 +1140,8 @@ encode_result_buffer(struct lua_State *L, struct merge_source *source,
 	       merge_source_next(source, NULL, &tuple)) == 0 &&
 	       tuple != NULL) {
 		uint32_t bsize = tuple_bsize(tuple);
-		ibuf_reserve(output_buffer, bsize);
-		memcpy(output_buffer->wpos, tuple_data(tuple), bsize);
-		output_buffer->wpos += bsize;
+		void *wpos = xibuf_alloc(output_buffer, bsize);
+		memcpy(wpos, tuple_data(tuple), bsize);
 		result_len_offset += bsize;
 		++result_len;
 
diff --git a/src/box/lua/net_box.c b/src/box/lua/net_box.c
index 7cba024bfd..8e14945356 100644
--- a/src/box/lua/net_box.c
+++ b/src/box/lua/net_box.c
@@ -270,6 +270,8 @@ struct netbox_transport {
 	struct ibuf send_buf;
 	/** Connection receive buffer. */
 	struct ibuf recv_buf;
+	/** Size of the last received message. */
+	size_t last_msg_size;
 	/** Signalled when send_buf becomes empty. */
 	struct fiber_cond on_send_buf_empty;
 	/** Next request id. */
@@ -523,6 +525,7 @@ netbox_transport_create(struct netbox_transport *transport)
 	iostream_clear(&transport->io);
 	ibuf_create(&transport->send_buf, &cord()->slabc, NETBOX_READAHEAD);
 	ibuf_create(&transport->recv_buf, &cord()->slabc, NETBOX_READAHEAD);
+	transport->last_msg_size = 0;
 	fiber_cond_create(&transport->on_send_buf_empty);
 	transport->next_sync = 1;
 	transport->requests = mh_i64ptr_new();
@@ -580,6 +583,7 @@ netbox_transport_set_error(struct netbox_transport *transport)
 	/* Reset buffers. */
 	ibuf_reinit(&transport->send_buf);
 	ibuf_reinit(&transport->recv_buf);
+	transport->last_msg_size = 0;
 	fiber_cond_broadcast(&transport->on_send_buf_empty);
 	/* Complete requests and clean up the hash. */
 	struct mh_i64ptr_t *h = transport->requests;
@@ -1142,7 +1146,7 @@ netbox_transport_communicate(struct netbox_transport *transport, size_t limit)
 						"Peer closed");
 				return -1;
 			} if (rc > 0) {
-				recv_buf->wpos += rc;
+				VERIFY(ibuf_alloc(recv_buf, rc) != NULL);
 			} else if (rc == IOSTREAM_ERROR) {
 				goto io_error;
 			} else {
@@ -1156,7 +1160,7 @@ netbox_transport_communicate(struct netbox_transport *transport, size_t limit)
 			ssize_t rc = iostream_write(io, send_buf->rpos,
 						    ibuf_used(send_buf));
 			if (rc >= 0) {
-				send_buf->rpos += rc;
+				ibuf_consume(send_buf, rc);
 				if (ibuf_used(send_buf) == 0)
 					fiber_cond_broadcast(on_send_buf_empty);
 			} else if (rc == IOSTREAM_ERROR) {
@@ -1193,6 +1197,7 @@ static int
 netbox_transport_send_and_recv(struct netbox_transport *transport,
 			       struct xrow_header *hdr)
 {
+	ibuf_consume(&transport->recv_buf, transport->last_msg_size);
 	while (true) {
 		size_t required;
 		size_t data_len = ibuf_used(&transport->recv_buf);
@@ -1212,10 +1217,11 @@ netbox_transport_send_and_recv(struct netbox_transport *transport,
 			required = size + len;
 			if (data_len >= required) {
 				const char *body_end = rpos + len;
-				transport->recv_buf.rpos = (char *)body_end;
-				return xrow_header_decode(
-					hdr, &rpos, body_end,
-					/*end_is_exact=*/true);
+				int rc = xrow_header_decode(
+						hdr, &rpos, body_end,
+						/*end_is_exact=*/true);
+				transport->last_msg_size = body_end - bufpos;
+				return rc;
 			}
 		}
 		if (netbox_transport_communicate(transport, required) != 0)
diff --git a/src/box/lua/schema.lua b/src/box/lua/schema.lua
index 646cad25f0..06644b3825 100644
--- a/src/box/lua/schema.lua
+++ b/src/box/lua/schema.lua
@@ -1941,7 +1941,7 @@ local function iterator_pos_set(index, pos, ibuf)
         iterator_pos_end[0] = iterator_pos[0] + #pos
         return true
     else
-        ibuf.rpos = ibuf.wpos
+        ibuf:consume(ibuf.wpos - ibuf.rpos)
         local tuple, tuple_end = tuple_encode(ibuf, pos)
         return builtin.box_index_tuple_position(
                 index.space_id, index.id, tuple, tuple_end,
diff --git a/src/box/lua/tuple.lua b/src/box/lua/tuple.lua
index a38a4f84ef..584fe32ab1 100644
--- a/src/box/lua/tuple.lua
+++ b/src/box/lua/tuple.lua
@@ -304,9 +304,7 @@ end
 local function tuple_to_msgpack(buf, tuple)
     assert(ffi.istype(tuple_t, tuple))
     local bsize = builtin.box_tuple_bsize(tuple)
-    buf:reserve(bsize)
-    builtin.box_tuple_to_buf(tuple, buf.wpos, bsize)
-    buf.wpos = buf.wpos + bsize
+    builtin.box_tuple_to_buf(tuple, buf:alloc(bsize), bsize)
 end
 
 local function tuple_bsize(tuple)
diff --git a/src/httpc.c b/src/httpc.c
index 025346e149..14880200d1 100644
--- a/src/httpc.c
+++ b/src/httpc.c
@@ -68,7 +68,7 @@ curl_easy_io_read_cb(char *buffer, size_t size, size_t nitems, void *ctx)
 
 	size_t read_len = ibuf_len < buffer_size ? ibuf_len : buffer_size;
 	memcpy(buffer, req->send.rpos, read_len);
-	req->send.rpos += read_len;
+	ibuf_consume(&req->send, read_len);
 
 	fiber_cond_broadcast(&req->io_send_cond);
 	return read_len;
@@ -541,7 +541,7 @@ httpc_request_io_read(struct httpc_request *req, char *buf, size_t len,
 		if (copied == ibuf_len)
 			ibuf_reset(&req->io_recv);
 		else
-			req->io_recv.rpos += copied;
+			ibuf_consume(&req->io_recv, copied);
 	}
 
 	if (copied < len && recv_len > 0) {
diff --git a/src/lib/core/coio_buf.h b/src/lib/core/coio_buf.h
index 0d90a483c0..110cc700c5 100644
--- a/src/lib/core/coio_buf.h
+++ b/src/lib/core/coio_buf.h
@@ -50,7 +50,7 @@ coio_bread(struct iostream *io, struct ibuf *buf, size_t sz)
 	ssize_t n = coio_read_ahead(io, buf->wpos, sz, ibuf_unused(buf));
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -68,7 +68,7 @@ coio_bread_timeout(struct iostream *io, struct ibuf *buf, size_t sz,
 			                    timeout);
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -80,7 +80,7 @@ coio_breadn(struct iostream *io, struct ibuf *buf, size_t sz)
 	ssize_t n = coio_readn_ahead(io, buf->wpos, sz, ibuf_unused(buf));
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
@@ -99,7 +99,7 @@ coio_breadn_timeout(struct iostream *io, struct ibuf *buf, size_t sz,
 			                     timeout);
 	if (n < 0)
 		diag_raise();
-	buf->wpos += n;
+	VERIFY(ibuf_alloc(buf, n) != NULL);
 	return n;
 }
 
diff --git a/src/lib/core/prbuf.c b/src/lib/core/prbuf.c
index e60dc7d04b..da7d49a18e 100644
--- a/src/lib/core/prbuf.c
+++ b/src/lib/core/prbuf.c
@@ -427,6 +427,7 @@ prbuf_reader_wrap(struct prbuf_reader *reader)
 	ibuf_reset(&reader->buf);
 	reader->pos = reader->data_begin;
 	reader->read_pos = reader->pos;
+	reader->last_read_size = 0;
 }
 
 int
@@ -445,6 +446,8 @@ prbuf_reader_next(struct prbuf_reader *reader,
 		return 0;
 	}
 
+	/* Consume the record data of the previous read. */
+	ibuf_consume(&reader->buf, reader->last_read_size);
 	/* Check if we hit end of buffer and need to wrap around. */
 	if (reader->data_end - reader->pos < (off_t)record_size_overhead)
 		prbuf_reader_wrap(reader);
@@ -472,14 +475,14 @@ prbuf_reader_next(struct prbuf_reader *reader,
 	}
 
 	/* Read record data. */
-	reader->buf.rpos += record_size_overhead;
+	ibuf_consume(&reader->buf, record_size_overhead);
 	if (prbuf_reader_ensure(reader, sz) != 0)
 		return -1;
 
 	entry->ptr = reader->buf.rpos;
 	entry->size = sz;
+	reader->last_read_size = sz;
 	reader->pos += full_sz;
-	reader->buf.rpos += sz;
 	reader->unread_size -= full_sz;
 	return 0;
 }
diff --git a/src/lib/core/prbuf.h b/src/lib/core/prbuf.h
index ab5303bb0d..c06426984d 100644
--- a/src/lib/core/prbuf.h
+++ b/src/lib/core/prbuf.h
@@ -94,6 +94,10 @@ struct prbuf_reader {
 	 * If it is 0 then we read all the data.
 	 */
 	size_t unread_size;
+	/**
+	 * Number of bytes in the last record read by client.
+	 */
+	size_t last_read_size;
 	/**
 	 * File offset of the beginning of the data area.
 	 * Data area is the area after header till the end of the buffer.
@@ -122,6 +126,9 @@ prbuf_reader_create(struct prbuf_reader *reader, int fd, off_t offset);
  * Read the next record into entry argument. If there are no more records
  * then returned record will be a terminator (EOF, ptr == NULL && size == 0).
  *
+ * Pointer to data on successful read is valid until next call to this
+ * function.
+ *
  * After EOF the function can be called again and will return EOF.
  *
  * After failure reader is invalid and can only be closed.
diff --git a/src/lua/buffer.lua b/src/lua/buffer.lua
index c34298a3dc..7d6ad99524 100644
--- a/src/lua/buffer.lua
+++ b/src/lua/buffer.lua
@@ -154,6 +154,13 @@ local function ibuf_read(buf, size)
     return rpos
 end
 
+local function ibuf_consume(buf, size)
+    checkibuf(buf, 'consume')
+    checksize(buf, size)
+    utils.poison_memory_region(buf.rpos, size);
+    buf.rpos = buf.rpos + size
+end
+
 local function ibuf_serialize(buf)
     local properties = { rpos = buf.rpos, wpos = buf.wpos }
     return { ibuf = properties }
@@ -168,6 +175,7 @@ local ibuf_methods = {
 
     checksize = ibuf_checksize;
     read = ibuf_read;
+    consume = ibuf_consume;
     __serialize = ibuf_serialize;
 
     size = ibuf_used;
diff --git a/src/lua/httpc.lua b/src/lua/httpc.lua
index e694f1fc1d..a8f058bf6a 100644
--- a/src/lua/httpc.lua
+++ b/src/lua/httpc.lua
@@ -424,7 +424,7 @@ local function io_read(self, opts, timeout)
     local len = check(self, chunk, delimiter)
     if len ~= nil then
         local data = ffi.string(rbuf.rpos, len)
-        rbuf.rpos = rbuf.rpos + len
+        rbuf:consume(len)
         return data
     end
 
@@ -438,15 +438,15 @@ local function io_read(self, opts, timeout)
             self._errno = nil
             local len = rbuf:size()
             local data = ffi.string(rbuf.rpos, len)
-            rbuf.rpos = rbuf.rpos + len
+            rbuf:consume(len)
             return data
         else
-            rbuf.wpos = rbuf.wpos + res
+            rbuf:alloc(res)
             local len = check(self, chunk, delimiter)
             if len ~= nil then
                 self._errno = nil
                 local data = ffi.string(rbuf.rpos, len)
-                rbuf.rpos = rbuf.rpos + len
+                rbuf:consume(len)
                 return data
             end
         end
diff --git a/src/lua/socket.lua b/src/lua/socket.lua
index 2858148049..2b6b59759d 100644
--- a/src/lua/socket.lua
+++ b/src/lua/socket.lua
@@ -705,7 +705,7 @@ local function read(self, limit, timeout, check, ...)
     if len ~= nil then
         self._errno = nil
         local data = ffi.string(rbuf.rpos, len)
-        rbuf.rpos = rbuf.rpos + len
+        rbuf:consume(len)
         return data
     end
 
@@ -720,15 +720,15 @@ local function read(self, limit, timeout, check, ...)
             self._errno = nil
             local len = rbuf:size()
             local data = ffi.string(rbuf.rpos, len)
-            rbuf.rpos = rbuf.rpos + len
+            rbuf:consume(len)
             return data
         elseif res ~= nil then
-            rbuf.wpos = rbuf.wpos + res
+            rbuf:alloc(res)
             local len = check(self, limit, ...)
             if len ~= nil then
                 self._errno = nil
                 local data = ffi.string(rbuf.rpos, len)
-                rbuf.rpos = rbuf.rpos + len
+                rbuf:consume(len)
                 return data
             end
         elseif not errno_is_transient[self._errno] then
-- 
GitLab