diff --git a/src/cas.rs b/src/cas.rs index cc3d6a49671b0102e2431e0b7d16d5ae43d83c2b..e4372ea041036487a6d4caf3dd0a610cd4d0c420 100644 --- a/src/cas.rs +++ b/src/cas.rs @@ -239,7 +239,7 @@ fn proc_cas_local(req: Request) -> Result<Response> { if requested < last_persisted { // there's at least one persisted entry to check - let persisted = raft_storage.entries(requested + 1, last_persisted + 1)?; + let persisted = raft_storage.entries(requested + 1, last_persisted + 1, None)?; if persisted.len() < (last_persisted - requested) as usize { return Err(RaftError::Store(StorageError::Unavailable).into()); } diff --git a/src/schema.rs b/src/schema.rs index d8bea3906626befc83e16db4da2ad08a8758c7d6..c43751b1489b291f27ad3e8a307e479929f05570 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -1023,7 +1023,7 @@ pub fn wait_for_ddl_commit( let last_seen = prepare_commit; loop { let cur_applied = node.get_index(); - let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1)?; + let new_entries = raft_storage.entries(last_seen + 1, cur_applied + 1, None)?; for entry in new_entries { if entry.entry_type != raft::prelude::EntryType::EntryNormal { continue; diff --git a/src/traft/node.rs b/src/traft/node.rs index d9fc15016316834260a791e4fb37f29701330ec2..f90c9d8837e7e93c442841ac364544306a2e8297 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -454,6 +454,9 @@ impl NodeImpl { id: raft_id, applied, pre_vote: true, + // XXX: this value is pretty random, we should really do some + // testing to determine the best value for it. + max_size_per_msg: 64, ..Default::default() }; diff --git a/src/traft/raft_storage.rs b/src/traft/raft_storage.rs index e6c7f47d607678484bff1eec4d0f29e1f1ea08e8..212a4a03a9c63595861e2b939240b492d9e18956 100644 --- a/src/traft/raft_storage.rs +++ b/src/traft/raft_storage.rs @@ -242,15 +242,24 @@ impl RaftSpaceAccess { /// /// Panics if `high` < `low`. /// - pub fn entries(&self, low: RaftIndex, high: RaftIndex) -> tarantool::Result<Vec<traft::Entry>> { + pub fn entries( + &self, + low: RaftIndex, + high: RaftIndex, + limit: Option<usize>, + ) -> tarantool::Result<Vec<traft::Entry>> { let iter = self.space_raft_log.select(IteratorType::GE, &(low,))?; - let mut ret = Vec::with_capacity((high - low) as _); + let limit = limit.unwrap_or(usize::MAX); + let mut ret = Vec::with_capacity(limit.min((high - low) as _)); for tuple in iter { let row = tuple.decode::<traft::Entry>()?; if row.index >= high { break; } + if ret.len() == limit { + break; + } ret.push(row); } @@ -530,14 +539,15 @@ impl raft::Storage for RaftSpaceAccess { &self, low: RaftIndex, high: RaftIndex, - _max_size: impl Into<Option<u64>>, + limit: impl Into<Option<u64>>, ) -> Result<Vec<raft::Entry>, RaftError> { if low <= self.compacted_index().cvt_err()? { return Err(RaftError::Store(StorageError::Compacted)); } - let ret: Vec<traft::Entry> = self.entries(low, high).cvt_err()?; - if ret.len() < (high - low) as usize { + let limit = limit.into().map(|l| l as usize); + let ret: Vec<traft::Entry> = self.entries(low, high, limit).cvt_err()?; + if ret.len() < limit.unwrap_or(usize::MAX).min((high - low) as usize) { return Err(RaftError::Store(StorageError::Unavailable)); } Ok(ret.into_iter().map(Into::into).collect())