From 7b64d6bc49bf43455b16f3f0e237c53ec44dd6ae Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 15 Dec 2022 16:47:48 +0300 Subject: [PATCH] refactor(storage): one EntryIter for all storage structs --- src/governor/mod.rs | 1 + src/storage.rs | 172 ++++++++++++++++---------------------- src/traft/node.rs | 1 + src/traft/rpc/join.rs | 1 + src/traft/rpc/sharding.rs | 1 + 5 files changed, 76 insertions(+), 100 deletions(-) diff --git a/src/governor/mod.rs b/src/governor/mod.rs index 71a961d56f..f308111f83 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -9,6 +9,7 @@ use ::tarantool::space::UpdateOps; use crate::event::{self, Event}; use crate::r#loop::FlowControl::{self, Continue}; +use crate::storage::ToEntryIter as _; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::tlog; use crate::traft::network::{ConnectionPool, IdOfInstance}; diff --git a/src/storage.rs b/src/storage.rs index 2ff22a95d7..2db5d86d40 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -225,33 +225,14 @@ impl Replicasets { None => Ok(None), } } - - #[inline] - pub fn iter(&self) -> Result<ReplicasetIter> { - let iter = self.space.select(IteratorType::All, &())?; - Ok(iter.into()) - } } -//////////////////////////////////////////////////////////////////////////////// -// ReplicasetIter -//////////////////////////////////////////////////////////////////////////////// - -pub struct ReplicasetIter { - iter: IndexIterator, -} - -impl From<IndexIterator> for ReplicasetIter { - fn from(iter: IndexIterator) -> Self { - Self { iter } - } -} +impl ToEntryIter for Replicasets { + type Entry = Replicaset; -impl Iterator for ReplicasetIter { - type Item = traft::Replicaset; - fn next(&mut self) -> Option<Self::Item> { - let res = self.iter.next().as_ref().map(Tuple::decode); - res.map(|res| res.expect("replicaset should decode correctly")) + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) } } @@ -317,35 +298,14 @@ impl PeerAddresses { self.get(raft_id)? .ok_or(Error::AddressUnknownForRaftId(raft_id)) } - - #[allow(dead_code)] - #[inline] - pub fn iter(&self) -> tarantool::Result<PeerAddressIter> { - let iter = self.space.select(IteratorType::All, &())?; - Ok(PeerAddressIter::new(iter)) - } } -//////////////////////////////////////////////////////////////////////////////// -// PeerAddressIter -//////////////////////////////////////////////////////////////////////////////// +impl ToEntryIter for PeerAddresses { + type Entry = traft::PeerAddress; -pub struct PeerAddressIter { - iter: IndexIterator, -} - -#[allow(dead_code)] -impl PeerAddressIter { - fn new(iter: IndexIterator) -> Self { - Self { iter } - } -} - -impl Iterator for PeerAddressIter { - type Item = traft::PeerAddress; - fn next(&mut self) -> Option<Self::Item> { - let res = self.iter.next().as_ref().map(Tuple::decode); - res.map(|res| res.expect("peer address should decode correctly")) + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) } } @@ -452,12 +412,6 @@ impl Instances { Ok(InstancesFields::new(iter)) } - #[inline] - pub fn iter(&self) -> tarantool::Result<InstanceIter> { - let iter = self.space.select(IteratorType::All, &())?; - Ok(InstanceIter::new(iter)) - } - #[inline] pub fn all_instances(&self) -> tarantool::Result<Vec<traft::Instance>> { self.space @@ -466,11 +420,14 @@ impl Instances { .collect() } - pub fn replicaset_instances(&self, replicaset_id: &str) -> tarantool::Result<InstanceIter> { + pub fn replicaset_instances( + &self, + replicaset_id: &str, + ) -> tarantool::Result<EntryIter<traft::Instance>> { let iter = self .index_replicaset_id .select(IteratorType::Eq, &[replicaset_id])?; - Ok(InstanceIter::new(iter)) + Ok(EntryIter::new(iter)) } pub fn replicaset_fields<T>( @@ -487,6 +444,15 @@ impl Instances { } } +impl ToEntryIter for Instances { + type Entry = traft::Instance; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) + } +} + //////////////////////////////////////////////////////////////////////////////// // InstanceField //////////////////////////////////////////////////////////////////////////////// @@ -648,34 +614,6 @@ impl InstanceId for traft::InstanceId { } } -//////////////////////////////////////////////////////////////////////////////// -// InstanceIter -//////////////////////////////////////////////////////////////////////////////// - -pub struct InstanceIter { - iter: IndexIterator, -} - -impl InstanceIter { - fn new(iter: IndexIterator) -> Self { - Self { iter } - } -} - -impl Iterator for InstanceIter { - type Item = traft::Instance; - fn next(&mut self) -> Option<Self::Item> { - let res = self.iter.next().as_ref().map(Tuple::decode); - res.map(|res| res.expect("instance should decode correctly")) - } -} - -impl std::fmt::Debug for InstanceIter { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("InstanceIter").finish_non_exhaustive() - } -} - //////////////////////////////////////////////////////////////////////////////// // InstancesFields //////////////////////////////////////////////////////////////////////////////// @@ -712,7 +650,6 @@ where #[derive(Clone, Debug)] pub struct Migrations { - #[allow(dead_code)] space: Space, } @@ -750,37 +687,72 @@ impl Migrations { #[inline] pub fn get_latest(&self) -> tarantool::Result<Option<Migration>> { let iter = self.space.select(IteratorType::Req, &())?; - let iter = MigrationIter::from(iter); + let iter = EntryIter::new(iter); let ms = iter.take(1).collect::<Vec<_>>(); Ok(ms.first().cloned()) } +} - #[inline] - pub fn iter(&self) -> Result<MigrationIter> { - let iter = self.space.select(IteratorType::All, &())?; - Ok(iter.into()) +impl ToEntryIter for Migrations { + type Entry = Migration; + + #[inline(always)] + fn index_iter(&self) -> Result<IndexIterator> { + Ok(self.space.select(IteratorType::All, &())?) } } //////////////////////////////////////////////////////////////////////////////// -// MigrationtIter +// EntryIter //////////////////////////////////////////////////////////////////////////////// -pub struct MigrationIter { +/// This trait is implemented for storage structs for iterating over the entries +/// from that storage. +pub trait ToEntryIter { + /// Target type for entry deserialization. + type Entry; + + fn index_iter(&self) -> Result<IndexIterator>; + + #[inline(always)] + fn iter(&self) -> Result<EntryIter<Self::Entry>> { + Ok(EntryIter::new(self.index_iter()?)) + } +} + +/// An iterator struct for automatically deserializing tuples into a given type. +/// +/// # Panics +/// Will panic in case deserialization fails on a given iteration. +pub struct EntryIter<T> { iter: IndexIterator, + marker: PhantomData<T>, } -impl From<IndexIterator> for MigrationIter { - fn from(iter: IndexIterator) -> Self { - Self { iter } +impl<T> EntryIter<T> { + pub fn new(iter: IndexIterator) -> Self { + Self { + iter, + marker: PhantomData, + } } } -impl Iterator for MigrationIter { - type Item = traft::Migration; +impl<T> Iterator for EntryIter<T> +where + T: DecodeOwned, +{ + type Item = T; fn next(&mut self) -> Option<Self::Item> { let res = self.iter.next().as_ref().map(Tuple::decode); - res.map(|res| res.expect("migration should decode correctly")) + res.map(|res| res.expect("entry should decode correctly")) + } +} + +impl<T> std::fmt::Debug for EntryIter<T> { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct(std::any::type_name::<Self>()) + .finish_non_exhaustive() } } diff --git a/src/traft/node.rs b/src/traft/node.rs index 9389cc1ce2..dbe775dd9c 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -29,6 +29,7 @@ use crate::governor; use crate::kvcell::KVCell; use crate::loop_start; use crate::r#loop::FlowControl; +use crate::storage::ToEntryIter as _; use crate::storage::{Clusterwide, ClusterwideSpace, PropertyName}; use crate::stringify_cfunc; use crate::traft::ContextCoercion as _; diff --git a/src/traft/rpc/join.rs b/src/traft/rpc/join.rs index e4c822242f..712d9884c6 100644 --- a/src/traft/rpc/join.rs +++ b/src/traft/rpc/join.rs @@ -1,3 +1,4 @@ +use crate::storage::ToEntryIter as _; use crate::traft::{ error::Error, node, Address, FailureDomain, Instance, InstanceId, PeerAddress, ReplicasetId, Result, diff --git a/src/traft/rpc/sharding.rs b/src/traft/rpc/sharding.rs index 853ec95da3..5ad8a57faf 100644 --- a/src/traft/rpc/sharding.rs +++ b/src/traft/rpc/sharding.rs @@ -86,6 +86,7 @@ pub mod bootstrap { #[rustfmt::skip] pub mod cfg { use crate::storage::Clusterwide; + use crate::storage::ToEntryIter as _; use crate::traft::{Result, ReplicasetId}; use ::tarantool::tlua; -- GitLab