Skip to content
Snippets Groups Projects
Commit a2f1f048 authored by Denis Smirnov's avatar Denis Smirnov
Browse files

fix: migrate to rust-allocated tuples for virtual tables

parent 40294736
No related branches found
No related tags found
1 merge request!513feat: use rust allocated tuples for binary data
......@@ -4,7 +4,7 @@ use serde::{Deserialize, Serialize};
use smol_str::{format_smolstr, SmolStr};
use std::collections::HashMap;
use tarantool::tlua::{self, AsLua, Push, PushGuard, PushInto, PushOne, PushOneInto, Void};
use tarantool::tuple::{Tuple, TupleBuffer};
use tarantool::tuple::{Tuple, TupleBuilder};
use crate::backend::sql::tree::OrderedSyntaxNodes;
use crate::debug;
......@@ -48,16 +48,9 @@ where
return Err(msg);
}
let mut msgpack_header = [0_u8; 6];
// array of len 1
msgpack_header[0] = b'\x91';
// string with 32bit length
msgpack_header[1] = b'\xdb';
// 32bit length of string
msgpack_header[2..].copy_from_slice(&(bincode_size as u32).to_be_bytes());
let msgpack_header = msgpack_header_for_data(bincode_size as u32);
let capacity = msgpack_header.len() + bincode_size as usize;
let mut builder = tarantool::tuple::TupleBuilder::rust_allocated();
let mut builder = TupleBuilder::rust_allocated();
builder.reserve(capacity);
builder.append(&msgpack_header);
......@@ -86,17 +79,22 @@ where
Ok(tuple)
}
pub fn rust_allocated_tuple_from_bytes(data: &[u8]) -> Tuple {
pub fn msgpack_header_for_data(data_len: u32) -> [u8; 6] {
let mut msgpack_header = [0_u8; 6];
// array of len 1
msgpack_header[0] = b'\x91';
// string with 32bit length
msgpack_header[1] = b'\xdb';
// 32bit length of string
msgpack_header[2..].copy_from_slice(&(data.len() as u32).to_be_bytes());
msgpack_header[2..].copy_from_slice(&data_len.to_be_bytes());
msgpack_header
}
pub fn rust_allocated_tuple_from_bytes(data: &[u8]) -> Tuple {
assert!(data.len() <= u32::MAX as usize);
let msgpack_header = msgpack_header_for_data(data.len() as u32);
let capacity = msgpack_header.len() + data.len();
let mut builder = tarantool::tuple::TupleBuilder::rust_allocated();
let mut builder = TupleBuilder::rust_allocated();
builder.reserve(capacity);
builder.append(&msgpack_header);
......@@ -240,13 +238,10 @@ impl<'e> IntoIterator for &'e EncodedRows {
type IntoIter = EncodedRowsIter<'e>;
fn into_iter(self) -> Self::IntoIter {
let capacity = *self.marking.iter().max().unwrap_or(&0);
EncodedRowsIter {
stream: Bytes::from(self.encoded.0.data()),
marking: &self.marking,
position: 0,
// Allocate buffer for encoded row.
buf: Vec::with_capacity(capacity),
}
}
}
......@@ -258,8 +253,6 @@ pub struct EncodedRowsIter<'e> {
marking: &'e [usize],
/// Current stream position.
position: usize,
/// Buffer for encoded tuple.
buf: Vec<u8>,
}
impl<'e> Iterator for EncodedRowsIter<'e> {
......@@ -280,22 +273,17 @@ impl<'e> Iterator for EncodedRowsIter<'e> {
let Some(row_len) = self.marking.get(cur_pos) else {
return None;
};
// We take a preallocated buffer, clear it (preserving allocated rust memory) and
// fill it with encoded tuple (copying from stream). Then we create a tuple from
// this buffer (i.e. allocate tarantool memory and copy from buffer) and return
// rust memory back to the buffer vector. As a result we make only one tarantool
// memory allocation per tuple and two copies of encoded tuple bytes.
// TODO: refactor this code when we switch to rust-allocated tuples.
self.buf.clear();
let mut tmp = Vec::new();
std::mem::swap(&mut self.buf, &mut tmp);
tmp.resize(*row_len, 0);
self.stream.read_exact_buf(&mut tmp).expect("encoded tuple");
let tbuf = TupleBuffer::try_from_vec(tmp).expect("tuple buffer");
let tuple = Tuple::from(&tbuf);
// Return back previously allocated buffer.
self.buf = Vec::from(tbuf);
assert!(*row_len <= u32::MAX as usize);
let mut builder = TupleBuilder::rust_allocated();
builder.reserve(*row_len);
for _ in 0..*row_len {
let byte = self.stream.read_u8().expect("encoded tuple");
builder.append(&[byte]);
}
let tuple = builder
.into_tuple()
.expect("failed to create rust-allocated tuple");
Some(tuple)
}
}
......
use ahash::AHashSet;
use rmp::encode::write_array_len;
use rmp_serde::Serializer;
use serde::{Deserialize, Serialize};
use smol_str::{format_smolstr, SmolStr};
use std::any::Any;
use std::collections::{hash_map::Entry, HashMap};
use std::fmt::{Display, Formatter};
use std::io::Write;
use std::rc::Rc;
use std::vec;
use tarantool::tuple::TupleBuilder;
use crate::errors::{Entity, SbroadError};
use crate::executor::engine::helpers::{TupleBuilderCommand, TupleBuilderPattern};
use crate::executor::protocol::{Binary, EncodedRows, EncodedTables};
use crate::executor::protocol::{msgpack_header_for_data, Binary, EncodedRows, EncodedTables};
use crate::executor::{bucket::Buckets, Vshard};
use crate::ir::helpers::RepeatableState;
use crate::ir::node::NodeId;
use crate::ir::relation::{Column, ColumnRole, Type};
use crate::ir::transformation::redistribution::{ColumnPosition, MotionKey, Target};
use crate::ir::value::{EncodedValue, LuaValue, MsgPackValue, Value};
use crate::utils::{ByteCounter, SliceWriter};
use crate::utils::{write_u32_array_len, ByteCounter};
use super::ir::ExecutionPlan;
use super::result::{ExecutorTuple, MetadataColumn, ProducerResult};
......@@ -659,13 +660,12 @@ impl<'t> TupleIterator<'t> {
}
}
fn write_vtable_as_msgpack(vtable: &VirtualTable, buf: &mut [u8]) {
let mut stream = SliceWriter::new(buf);
fn write_vtable_as_msgpack(vtable: &VirtualTable, stream: &mut impl Write) {
let array_len =
u32::try_from(vtable.get_tuples().len()).expect("expected u32 tuples in virtual table");
write_array_len(&mut stream, array_len).expect("failed to write array length");
write_u32_array_len(stream, array_len).expect("failed to write array len");
let mut ser = Serializer::new(&mut stream);
let mut ser = Serializer::new(stream);
let mut tuple_iter = TupleIterator::new(vtable);
while let Some(tuple) = tuple_iter.next() {
......@@ -698,11 +698,18 @@ impl ExecutionPlan {
for (id, vtable) in vtables {
let marking = vtable_marking(vtable);
// Array length marker (1 byte) + array length (up to 4 bytes) + tuples.
let capacity = 5 + marking.iter().sum::<usize>();
let mut buf = vec![0; capacity];
write_vtable_as_msgpack(vtable, &mut buf);
let binary_table = Binary::from(buf);
// Array marker (1 byte) + array length (4 bytes) + tuples.
let data_len = 5 + marking.iter().sum::<usize>();
assert!(data_len <= u32::MAX as usize);
let msgpack_header = msgpack_header_for_data(data_len as u32);
let mut builder = TupleBuilder::rust_allocated();
builder.reserve(data_len + msgpack_header.len());
builder.append(&msgpack_header);
write_vtable_as_msgpack(vtable, &mut builder);
let Ok(tuple) = builder.into_tuple() else {
unreachable!("failed to build binary table")
};
let binary_table = Binary::from(tuple);
encoded_tables.insert(*id, EncodedRows::new(marking, binary_table));
}
encoded_tables
......
use std::cell::{RefCell, RefMut};
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash, RandomState};
use std::io::{Error, Result, Write};
use std::io::{Result, Write};
use std::ops::DerefMut;
use rmp::encode::RmpWrite;
use rmp::Marker;
use tarantool::fiber::mutex::MutexGuard as TMutexGuard;
use tarantool::fiber::Mutex as TMutex;
......@@ -58,41 +60,12 @@ impl Write for ByteCounter {
}
}
#[derive(Debug, Default)]
pub struct SliceWriter<'b> {
buf: &'b mut [u8],
pos: usize,
}
impl SliceWriter<'_> {
#[must_use]
pub fn new(buf: &mut [u8]) -> SliceWriter {
SliceWriter { buf, pos: 0 }
}
}
impl<'b> Write for SliceWriter<'b> {
fn write(&mut self, buf: &[u8]) -> Result<usize> {
let len = buf.len();
if len > self.buf.len() - self.pos {
return Err(Error::new(
std::io::ErrorKind::WriteZero,
format!(
"no space left in buffer (position = {}, length = {}, requested = {})",
self.pos,
self.buf.len(),
len
),
));
}
self.buf[self.pos..self.pos + len].copy_from_slice(buf);
self.pos += len;
Ok(len)
}
fn flush(&mut self) -> Result<()> {
Ok(())
}
pub fn write_u32_array_len<W: Write>(stream: &mut W, len: u32) -> Result<()> {
stream.write_bytes(&[Marker::Array32.to_u8()])?;
let mut buf = [0u8; 4];
buf.copy_from_slice(&len.to_be_bytes());
stream.write_bytes(&buf)?;
Ok(())
}
pub struct OrderedMap<K, V, S = RandomState> {
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment