From a2f1f0487fa3e831fe28a9d4154dbd3d87f3d7b2 Mon Sep 17 00:00:00 2001
From: Denis Smirnov <sd@picodata.io>
Date: Wed, 11 Sep 2024 12:42:30 +0700
Subject: [PATCH] fix: migrate to rust-allocated tuples for virtual tables

---
 sbroad-core/src/executor/protocol.rs | 56 +++++++++++-----------------
 sbroad-core/src/executor/vtable.rs   | 31 +++++++++------
 sbroad-core/src/utils.rs             | 45 +++++-----------------
 3 files changed, 50 insertions(+), 82 deletions(-)

diff --git a/sbroad-core/src/executor/protocol.rs b/sbroad-core/src/executor/protocol.rs
index 42c57c697..1a30b3fbb 100644
--- a/sbroad-core/src/executor/protocol.rs
+++ b/sbroad-core/src/executor/protocol.rs
@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
 use smol_str::{format_smolstr, SmolStr};
 use std::collections::HashMap;
 use tarantool::tlua::{self, AsLua, Push, PushGuard, PushInto, PushOne, PushOneInto, Void};
-use tarantool::tuple::{Tuple, TupleBuffer};
+use tarantool::tuple::{Tuple, TupleBuilder};
 
 use crate::backend::sql::tree::OrderedSyntaxNodes;
 use crate::debug;
@@ -48,16 +48,9 @@ where
         return Err(msg);
     }
 
-    let mut msgpack_header = [0_u8; 6];
-    // array of len 1
-    msgpack_header[0] = b'\x91';
-    // string with 32bit length
-    msgpack_header[1] = b'\xdb';
-    // 32bit length of string
-    msgpack_header[2..].copy_from_slice(&(bincode_size as u32).to_be_bytes());
-
+    let msgpack_header = msgpack_header_for_data(bincode_size as u32);
     let capacity = msgpack_header.len() + bincode_size as usize;
-    let mut builder = tarantool::tuple::TupleBuilder::rust_allocated();
+    let mut builder = TupleBuilder::rust_allocated();
     builder.reserve(capacity);
     builder.append(&msgpack_header);
 
@@ -86,17 +79,22 @@ where
     Ok(tuple)
 }
 
-pub fn rust_allocated_tuple_from_bytes(data: &[u8]) -> Tuple {
+pub fn msgpack_header_for_data(data_len: u32) -> [u8; 6] {
     let mut msgpack_header = [0_u8; 6];
     // array of len 1
     msgpack_header[0] = b'\x91';
     // string with 32bit length
     msgpack_header[1] = b'\xdb';
     // 32bit length of string
-    msgpack_header[2..].copy_from_slice(&(data.len() as u32).to_be_bytes());
+    msgpack_header[2..].copy_from_slice(&data_len.to_be_bytes());
+    msgpack_header
+}
 
+pub fn rust_allocated_tuple_from_bytes(data: &[u8]) -> Tuple {
+    assert!(data.len() <= u32::MAX as usize);
+    let msgpack_header = msgpack_header_for_data(data.len() as u32);
     let capacity = msgpack_header.len() + data.len();
-    let mut builder = tarantool::tuple::TupleBuilder::rust_allocated();
+    let mut builder = TupleBuilder::rust_allocated();
     builder.reserve(capacity);
     builder.append(&msgpack_header);
 
@@ -240,13 +238,10 @@ impl<'e> IntoIterator for &'e EncodedRows {
     type IntoIter = EncodedRowsIter<'e>;
 
     fn into_iter(self) -> Self::IntoIter {
-        let capacity = *self.marking.iter().max().unwrap_or(&0);
         EncodedRowsIter {
             stream: Bytes::from(self.encoded.0.data()),
             marking: &self.marking,
             position: 0,
-            // Allocate buffer for encoded row.
-            buf: Vec::with_capacity(capacity),
         }
     }
 }
@@ -258,8 +253,6 @@ pub struct EncodedRowsIter<'e> {
     marking: &'e [usize],
     /// Current stream position.
     position: usize,
-    /// Buffer for encoded tuple.
-    buf: Vec<u8>,
 }
 
 impl<'e> Iterator for EncodedRowsIter<'e> {
@@ -280,22 +273,17 @@ impl<'e> Iterator for EncodedRowsIter<'e> {
         let Some(row_len) = self.marking.get(cur_pos) else {
             return None;
         };
-        // We take a preallocated buffer, clear it (preserving allocated rust memory) and
-        // fill it with encoded tuple (copying from stream). Then we create a tuple from
-        // this buffer (i.e. allocate tarantool memory and copy from buffer) and return
-        // rust memory back to the buffer vector. As a result we make only one tarantool
-        // memory allocation per tuple and two copies of encoded tuple bytes.
-
-        // TODO: refactor this code when we switch to rust-allocated tuples.
-        self.buf.clear();
-        let mut tmp = Vec::new();
-        std::mem::swap(&mut self.buf, &mut tmp);
-        tmp.resize(*row_len, 0);
-        self.stream.read_exact_buf(&mut tmp).expect("encoded tuple");
-        let tbuf = TupleBuffer::try_from_vec(tmp).expect("tuple buffer");
-        let tuple = Tuple::from(&tbuf);
-        // Return back previously allocated buffer.
-        self.buf = Vec::from(tbuf);
+
+        assert!(*row_len <= u32::MAX as usize);
+        let mut builder = TupleBuilder::rust_allocated();
+        builder.reserve(*row_len);
+        for _ in 0..*row_len {
+            let byte = self.stream.read_u8().expect("encoded tuple");
+            builder.append(&[byte]);
+        }
+        let tuple = builder
+            .into_tuple()
+            .expect("failed to create rust-allocated tuple");
         Some(tuple)
     }
 }
diff --git a/sbroad-core/src/executor/vtable.rs b/sbroad-core/src/executor/vtable.rs
index 6e2befb0c..2705b2fe4 100644
--- a/sbroad-core/src/executor/vtable.rs
+++ b/sbroad-core/src/executor/vtable.rs
@@ -1,24 +1,25 @@
 use ahash::AHashSet;
-use rmp::encode::write_array_len;
 use rmp_serde::Serializer;
 use serde::{Deserialize, Serialize};
 use smol_str::{format_smolstr, SmolStr};
 use std::any::Any;
 use std::collections::{hash_map::Entry, HashMap};
 use std::fmt::{Display, Formatter};
+use std::io::Write;
 use std::rc::Rc;
 use std::vec;
+use tarantool::tuple::TupleBuilder;
 
 use crate::errors::{Entity, SbroadError};
 use crate::executor::engine::helpers::{TupleBuilderCommand, TupleBuilderPattern};
-use crate::executor::protocol::{Binary, EncodedRows, EncodedTables};
+use crate::executor::protocol::{msgpack_header_for_data, Binary, EncodedRows, EncodedTables};
 use crate::executor::{bucket::Buckets, Vshard};
 use crate::ir::helpers::RepeatableState;
 use crate::ir::node::NodeId;
 use crate::ir::relation::{Column, ColumnRole, Type};
 use crate::ir::transformation::redistribution::{ColumnPosition, MotionKey, Target};
 use crate::ir::value::{EncodedValue, LuaValue, MsgPackValue, Value};
-use crate::utils::{ByteCounter, SliceWriter};
+use crate::utils::{write_u32_array_len, ByteCounter};
 
 use super::ir::ExecutionPlan;
 use super::result::{ExecutorTuple, MetadataColumn, ProducerResult};
@@ -659,13 +660,12 @@ impl<'t> TupleIterator<'t> {
     }
 }
 
-fn write_vtable_as_msgpack(vtable: &VirtualTable, buf: &mut [u8]) {
-    let mut stream = SliceWriter::new(buf);
+fn write_vtable_as_msgpack(vtable: &VirtualTable, stream: &mut impl Write) {
     let array_len =
         u32::try_from(vtable.get_tuples().len()).expect("expected u32 tuples in virtual table");
-    write_array_len(&mut stream, array_len).expect("failed to write array length");
+    write_u32_array_len(stream, array_len).expect("failed to write array len");
 
-    let mut ser = Serializer::new(&mut stream);
+    let mut ser = Serializer::new(stream);
     let mut tuple_iter = TupleIterator::new(vtable);
 
     while let Some(tuple) = tuple_iter.next() {
@@ -698,11 +698,18 @@ impl ExecutionPlan {
 
         for (id, vtable) in vtables {
             let marking = vtable_marking(vtable);
-            // Array length marker (1 byte) + array length (up to 4 bytes) + tuples.
-            let capacity = 5 + marking.iter().sum::<usize>();
-            let mut buf = vec![0; capacity];
-            write_vtable_as_msgpack(vtable, &mut buf);
-            let binary_table = Binary::from(buf);
+            // Array marker (1 byte) + array length (4 bytes) + tuples.
+            let data_len = 5 + marking.iter().sum::<usize>();
+            assert!(data_len <= u32::MAX as usize);
+            let msgpack_header = msgpack_header_for_data(data_len as u32);
+            let mut builder = TupleBuilder::rust_allocated();
+            builder.reserve(data_len + msgpack_header.len());
+            builder.append(&msgpack_header);
+            write_vtable_as_msgpack(vtable, &mut builder);
+            let Ok(tuple) = builder.into_tuple() else {
+                unreachable!("failed to build binary table")
+            };
+            let binary_table = Binary::from(tuple);
             encoded_tables.insert(*id, EncodedRows::new(marking, binary_table));
         }
         encoded_tables
diff --git a/sbroad-core/src/utils.rs b/sbroad-core/src/utils.rs
index 21a30c688..b11907718 100644
--- a/sbroad-core/src/utils.rs
+++ b/sbroad-core/src/utils.rs
@@ -1,9 +1,11 @@
 use std::cell::{RefCell, RefMut};
 use std::collections::HashMap;
 use std::hash::{BuildHasher, Hash, RandomState};
-use std::io::{Error, Result, Write};
+use std::io::{Result, Write};
 use std::ops::DerefMut;
 
+use rmp::encode::RmpWrite;
+use rmp::Marker;
 use tarantool::fiber::mutex::MutexGuard as TMutexGuard;
 use tarantool::fiber::Mutex as TMutex;
 
@@ -58,41 +60,12 @@ impl Write for ByteCounter {
     }
 }
 
-#[derive(Debug, Default)]
-pub struct SliceWriter<'b> {
-    buf: &'b mut [u8],
-    pos: usize,
-}
-
-impl SliceWriter<'_> {
-    #[must_use]
-    pub fn new(buf: &mut [u8]) -> SliceWriter {
-        SliceWriter { buf, pos: 0 }
-    }
-}
-
-impl<'b> Write for SliceWriter<'b> {
-    fn write(&mut self, buf: &[u8]) -> Result<usize> {
-        let len = buf.len();
-        if len > self.buf.len() - self.pos {
-            return Err(Error::new(
-                std::io::ErrorKind::WriteZero,
-                format!(
-                    "no space left in buffer (position = {}, length = {}, requested = {})",
-                    self.pos,
-                    self.buf.len(),
-                    len
-                ),
-            ));
-        }
-        self.buf[self.pos..self.pos + len].copy_from_slice(buf);
-        self.pos += len;
-        Ok(len)
-    }
-
-    fn flush(&mut self) -> Result<()> {
-        Ok(())
-    }
+pub fn write_u32_array_len<W: Write>(stream: &mut W, len: u32) -> Result<()> {
+    stream.write_bytes(&[Marker::Array32.to_u8()])?;
+    let mut buf = [0u8; 4];
+    buf.copy_from_slice(&len.to_be_bytes());
+    stream.write_bytes(&buf)?;
+    Ok(())
 }
 
 pub struct OrderedMap<K, V, S = RandomState> {
-- 
GitLab