From 9f8f90f0d880c34a48aab71375c4996a1ceb2984 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Tue, 17 Oct 2023 21:02:16 +0300 Subject: [PATCH] fix: set non-default max_size_per_msg configuration --- src/cas.rs | 2 +- src/schema.rs | 2 +- src/traft/node.rs | 3 +++ src/traft/raft_storage.rs | 20 +++++++++++++++----- 4 files changed, 20 insertions(+), 7 deletions(-) diff --git a/src/cas.rs b/src/cas.rs index cc3d6a4967..e4372ea041 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -239,7 +239,7 @@ fn proc_cas_local(req: Request) -> Result<Response> { if requested < last_persisted { // there's at least one persisted entry to check - let persisted = raft_storage.entries(requested + 1, last_persisted + 1)?; + let persisted = raft_storage.entries(requested + 1, last_persisted + 1, None)?; if persisted.len() < (last_persisted - requested) as usize { return Err(RaftError::Store(StorageError::Unavailable).into()); } diff --git a/src/schema.rs b/src/schema.rs index d8bea39066..c43751b148 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1023,7 +1023,7 @@ pub fn wait_for_ddl_commit( let last_seen = prepare_commit; loop { let cur_applied = node.get_index(); - let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?; + let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1, None)?; for entry in new_entries { if entry.entry_type != raft::prelude::EntryType::EntryNormal { continue; diff --git a/src/traft/node.rs b/src/traft/node.rs index d9fc150163..f90c9d8837 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -454,6 +454,9 @@ impl NodeImpl { id: raft_id, applied, pre_vote: true, + // XXX: this value is pretty random, we should really do some + // testing to determine the best value for it. + max_size_per_msg: 64, ..Default::default() }; diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index e6c7f47d60..212a4a03a9 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -242,15 +242,24 @@ impl RaftSpaceAccess { /// /// Panics if `high` < `low`. /// - pub fn entries(&self, low: RaftIndex, high: RaftIndex) -> tarantool::Result<Vec<traft::Entry>> { + pub fn entries( + &self, + low: RaftIndex, + high: RaftIndex, + limit: Option<usize>, + ) -> tarantool::Result<Vec<traft::Entry>> { let iter = self.space_raft_log.select(IteratorType::GE, &(low,))?; - let mut ret = Vec::with_capacity((high - low) as _); + let limit = limit.unwrap_or(usize::MAX); + let mut ret = Vec::with_capacity(limit.min((high - low) as _)); for tuple in iter { let row = tuple.decode::<traft::Entry>()?; if row.index >= high { break; } + if ret.len() == limit { + break; + } ret.push(row); } @@ -530,14 +539,15 @@ impl raft::Storage for RaftSpaceAccess { &self, low: RaftIndex, high: RaftIndex, - _max_size: impl Into<Option<u64>>, + limit: impl Into<Option<u64>>, ) -> Result<Vec<raft::Entry>, RaftError> { if low <= self.compacted_index().cvt_err()? { return Err(RaftError::Store(StorageError::Compacted)); } - let ret: Vec<traft::Entry> = self.entries(low, high).cvt_err()?; - if ret.len() < (high - low) as usize { + let limit = limit.into().map(|l| l as usize); + let ret: Vec<traft::Entry> = self.entries(low, high, limit).cvt_err()?; + if ret.len() < limit.unwrap_or(usize::MAX).min((high - low) as usize) { return Err(RaftError::Store(StorageError::Unavailable)); } Ok(ret.into_iter().map(Into::into).collect()) -- GitLab