diff --git a/src/cbus.cc b/src/cbus.cc
index 0d07ddae34d127219a3a8d2e45dee819cd0a8311..5615ed703c961ad44cf70e84eb634bf79f8ae847 100644
--- a/src/cbus.cc
+++ b/src/cbus.cc
@@ -169,7 +169,7 @@ cbus_flush_cb(ev_loop * /* loop */, struct ev_async *watcher,
 	bool peer_output_was_empty = STAILQ_EMPTY(&peer->output);
 
 	cbus_lock(pipe->bus);
-	pipe_was_empty = STAILQ_EMPTY(&pipe->pipe);
+	pipe_was_empty = !ev_async_pending(&pipe->fetch_output);
 	/** Flush input */
 	STAILQ_CONCAT(&pipe->pipe, &pipe->input);
 	/*
@@ -201,7 +201,7 @@ cpipe_peek_impl(struct cpipe *pipe)
 	cbus_lock(pipe->bus);
 	STAILQ_CONCAT(&pipe->output, &pipe->pipe);
 	if (! STAILQ_EMPTY(&peer->input)) {
-		peer_pipe_was_empty = STAILQ_EMPTY(&peer->pipe);
+		peer_pipe_was_empty = !ev_async_pending(&peer->fetch_output);
 		STAILQ_CONCAT(&peer->pipe, &peer->input);
 	}
 	cbus_unlock(pipe->bus);