Skip to content
Snippets Groups Projects
Commit 9082ea4c authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

feat: msgpack::ValueIter

parent a61d2e85
No related branches found
No related tags found
1 merge request!313Feat/msgpack array writer
......@@ -20,6 +20,7 @@
linked object file in which the given symbol is defined.
- `msgpack::ArrayWriter` helper struct for generating msgpack arrays from
arbitrary serializable data.
- `msgpack::ValueIter` helper struct for iterating over msgpack values.
- `tarantool::network::client` alternative async network client.
- `tarantool::network::client::reconnect` reconnecting async network client based on `network::client`.
- `tarantool::network::protocol` sans-io (without transport layer) implementation of Tarantool Binary Protocol.
......
use std::io::Cursor;
use std::io::{Read, Seek, SeekFrom};
use byteorder::{BigEndian, ReadBytesExt};
use super::tuple::ToTupleBuffer;
use super::tuple::{Decode, RawBytes, ToTupleBuffer};
use crate::Result;
pub fn skip_value(cur: &mut (impl Read + Seek)) -> Result<()> {
......@@ -160,12 +161,12 @@ pub struct ArrayWriter<W> {
len: u32,
}
impl ArrayWriter<std::io::Cursor<Vec<u8>>> {
impl ArrayWriter<Cursor<Vec<u8>>> {
/// Create an `ArrayWriter` using a `Vec<u8>` as the underlying buffer.
#[track_caller]
#[inline(always)]
pub fn from_vec(buf: Vec<u8>) -> Self {
Self::new(std::io::Cursor::new(buf)).expect("allocation error")
Self::new(Cursor::new(buf)).expect("allocation error")
}
}
......@@ -256,6 +257,85 @@ where
}
}
////////////////////////////////////////////////////////////////////////////////
// ArrayIter
////////////////////////////////////////////////////////////////////////////////
/// A helper struct for iterating over msgpack values.
///
/// # Example
/// ```
/// use tarantool::msgpack::ValueIter;
/// let mut value_iter = ValueIter::from_array(b"\x93*\xc0\xa3yes").unwrap();
/// // You can decode the next value
/// assert_eq!(value_iter.decode_next::<i64>().map(Result::ok).flatten(), Some(42));
/// // Or just get the raw slice of bytes
/// assert_eq!(value_iter.next(), Some(&b"\xc0"[..]));
/// assert_eq!(value_iter.decode_next::<String>().map(Result::ok).flatten(), Some("yes".to_owned()));
///
/// // Returns None when there's no more values
/// assert_eq!(value_iter.decode_next::<String>().map(Result::ok), None);
/// assert_eq!(value_iter.next(), None);
/// ```
#[derive(Debug)]
pub struct ValueIter<'a> {
cursor: Cursor<&'a [u8]>,
}
impl<'a> ValueIter<'a> {
/// Return an iterator over elements of msgpack `array`, or error in case
/// `array` doesn't start with a valid msgpack array marker.
pub fn from_array(array: &'a [u8]) -> std::result::Result<Self, rmp::decode::ValueReadError> {
let mut cursor = Cursor::new(array);
// Don't care about length, will just exhaust all the values in the slice
rmp::decode::read_array_len(&mut cursor)?;
Ok(Self { cursor })
}
/// Return an iterator over msgpack values packed one after another in `data`.
pub fn new(data: &'a [u8]) -> Self {
Self {
cursor: Cursor::new(data),
}
}
/// Return an iterator over msgpack values packed one after another in `data`.
pub fn decode_next<T>(&mut self) -> Option<Result<T>>
where
T: Decode<'a>,
{
if self.cursor.position() as usize >= self.cursor.get_ref().len() {
return None;
}
let start = self.cursor.position() as usize;
if let Err(e) = skip_value(&mut self.cursor) {
return Some(Err(e));
}
let end = self.cursor.position() as usize;
debug_assert_ne!(start, end, "skip_value should've returned Err in this case");
let data = &self.cursor.get_ref()[start..end];
Some(T::decode(data))
}
pub fn into_inner(self) -> Cursor<&'a [u8]> {
self.cursor
}
}
impl<'a> Iterator for ValueIter<'a> {
type Item = &'a [u8];
#[inline(always)]
fn next(&mut self) -> Option<&'a [u8]> {
self.decode_next::<&RawBytes>()?.ok().map(|b| &**b)
}
}
////////////////////////////////////////////////////////////////////////////////
// test
////////////////////////////////////////////////////////////////////////////////
#[cfg(test)]
mod test {
use super::*;
......@@ -274,4 +354,47 @@ mod test {
((420, "foo".to_owned()), "bar".to_owned(), "baz".to_owned())
);
}
#[test]
fn value_iter() {
let mut iter = ValueIter::new(b"");
assert_eq!(iter.next(), None);
let mut iter = ValueIter::new(b"*");
assert_eq!(iter.next(), Some(&b"*"[..]));
assert_eq!(iter.next(), None);
let err = ValueIter::from_array(b"").unwrap_err();
assert_eq!(err.to_string(), "failed to read MessagePack marker");
let mut iter = ValueIter::from_array(b"\x99").unwrap();
assert_eq!(iter.next(), None);
let mut iter = ValueIter::from_array(b"\x99*").unwrap();
assert_eq!(iter.next(), Some(&b"*"[..]));
assert_eq!(iter.next(), None);
let data = b"\x93*\x93\xc0\xc2\xc3\xa3sup";
let mut iter = ValueIter::from_array(data).unwrap();
let v: u32 = iter.decode_next().unwrap().unwrap();
assert_eq!(v, 42);
let v: Vec<Option<bool>> = iter.decode_next().unwrap().unwrap();
assert_eq!(v, [None, Some(false), Some(true)]);
let v: String = iter.decode_next().unwrap().unwrap();
assert_eq!(v, "sup");
let mut iter = ValueIter::from_array(data).unwrap();
let v = iter.next().unwrap();
assert_eq!(v, b"*");
let v = iter.next().unwrap();
assert_eq!(v, b"\x93\xc0\xc2\xc3");
let v = iter.next().unwrap();
assert_eq!(v, b"\xa3sup");
let mut iter = ValueIter::new(data);
let v: (u32, Vec<Option<bool>>, String) =
rmp_serde::from_slice(iter.next().unwrap()).unwrap();
assert_eq!(v, (42, vec![None, Some(false), Some(true)], "sup".into()));
}
}
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment