diff --git a/src/mailbox.rs b/src/mailbox.rs index ee9ca37a5c747391d7d2f1fa61010e4b0375718d..d480f510755ff4dffe3d595fa0f4555f94a3acc6 100644 --- a/src/mailbox.rs +++ b/src/mailbox.rs @@ -31,7 +31,7 @@ impl<T> Mailbox<T> { self.0.cond.signal(); } - pub fn recv_timeout(&self, timeout: Duration) -> Vec<T> { + pub fn receive_all(&self, timeout: Duration) -> Vec<T> { if self.0.content.borrow().is_empty() { self.0.cond.wait_timeout(timeout); } diff --git a/src/traft/network.rs b/src/traft/network.rs index 372885416bc1937c027c31bb428d5fce7b9c6adb..06e95054c87caaab7e13a897ff89c0038f99fa41 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -79,12 +79,13 @@ impl PoolWorker { loop { // implicit yield - let messages = inbox.recv_timeout(opts.inactivity_timeout); + let messages = inbox.receive_all(opts.inactivity_timeout); if stop_flag.take().is_some() { return; } if messages.is_empty() { + // Connection has long been unused. Close it. cache.take(); continue; } diff --git a/src/traft/node.rs b/src/traft/node.rs index b7eb03a8b8c9ea2bee9856c446d47450d0327075..fa1d4efecbc982c0e1c14e9fad4915699d1d1c8f 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -243,7 +243,7 @@ fn raft_main_loop( // Clean up obsolete notifications notifications.retain(|_, notify: &mut Notify| !notify.is_closed()); - for req in main_inbox.recv_timeout(Node::TICK) { + for req in main_inbox.receive_all(Node::TICK) { match req { NormalRequest::ProposeNormal { op, notify } => { lc.inc(); @@ -525,7 +525,7 @@ impl AsTuple for JoinResponse {} fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<NormalRequest>) { loop { - let batch = inbox.recv_timeout(Duration::MAX); + let batch = inbox.receive_all(Duration::MAX); // TODO check leadership, else continue let (rx, tx) = fiber::Channel::new(1).into_clones();