diff --git a/changelogs/unreleased/gh-10846-fix-vinyl-replica-join-degradation-under-write-load.md b/changelogs/unreleased/gh-10846-fix-vinyl-replica-join-degradation-under-write-load.md
new file mode 100644
index 0000000000000000000000000000000000000000..2a42f121d9fe5a45a29fe407f97dd6b307755d7d
--- /dev/null
+++ b/changelogs/unreleased/gh-10846-fix-vinyl-replica-join-degradation-under-write-load.md
@@ -0,0 +1,5 @@
+## bugfix/vinyl
+
+* Fixed a bug when joining a new replica to a master instance that experiences
+  a heavy write load would severely degrade the master instance performance.
+  The fix should also speed up long-running scan requests (gh-10846).
diff --git a/src/box/vy_read_iterator.c b/src/box/vy_read_iterator.c
index 013d38ea280fe099a1f2212803abcef6a4035ee8..b47730e1ea3813b9f1e2735d1aa5f3722df3d7c5 100644
--- a/src/box/vy_read_iterator.c
+++ b/src/box/vy_read_iterator.c
@@ -59,6 +59,8 @@ struct vy_read_src {
 	bool is_last;
 	/** See vy_read_iterator->front_id. */
 	uint32_t front_id;
+	/** Max LSN that can be stored in this source. */
+	int64_t max_lsn;
 	/** History of the key the iterator is positioned at. */
 	struct vy_history history;
 };
@@ -104,6 +106,7 @@ vy_read_iterator_add_src(struct vy_read_iterator *itr)
 	}
 	struct vy_read_src *src = &itr->src[itr->src_count++];
 	memset(src, 0, sizeof(*src));
+	src->max_lsn = INT64_MAX;
 	vy_history_create(&src->history, &itr->lsm->env->history_node_pool);
 	return src;
 }
@@ -193,6 +196,24 @@ vy_read_iterator_cmp_stmt(struct vy_read_iterator *itr,
 		vy_entry_compare(a, b, itr->lsm->cmp_def);
 }
 
+/**
+ * Returns true if the given source can store statements visible from
+ * the read view used by the iterator.
+ */
+static inline bool
+vy_read_iterator_src_is_visible(struct vy_read_iterator *itr,
+				struct vy_read_src *src)
+{
+	uint32_t src_id = src - itr->src;
+	assert(src_id < itr->src_count);
+	/* The last source can store statements visible from any read view. */
+	if (src_id == itr->src_count - 1)
+		return true;
+	/* Sources are sorted by LSN so we check the next source's max LSN. */
+	struct vy_read_src *next_src = &itr->src[src_id + 1];
+	return (**itr->read_view).vlsn > next_src->max_lsn;
+}
+
 /**
  * Check if the statement at which the given read source
  * is positioned precedes the current candidate for the
@@ -205,6 +226,7 @@ vy_read_iterator_evaluate_src(struct vy_read_iterator *itr,
 			      struct vy_read_src *src,
 			      struct vy_entry *next, bool *stop)
 {
+	assert(src->is_started);
 	uint32_t src_id = src - itr->src;
 	struct vy_entry entry = vy_history_last_stmt(&src->history);
 	int cmp = vy_read_iterator_cmp_stmt(itr, entry, *next);
@@ -271,6 +293,7 @@ vy_read_iterator_reevaluate_srcs(struct vy_read_iterator *itr,
 		if (i >= itr->skipped_src)
 			break;
 		struct vy_read_src *src = &itr->src[i];
+		assert(src->is_started);
 		struct vy_entry entry = vy_history_last_stmt(&src->history);
 		int cmp = vy_read_iterator_cmp_stmt(itr, entry, *next);
 		if (cmp < 0) {
@@ -376,6 +399,9 @@ vy_read_iterator_scan_mem(struct vy_read_iterator *itr, uint32_t mem_src,
 
 	assert(mem_src >= itr->mem_src && mem_src < itr->disk_src);
 
+	if (!vy_read_iterator_src_is_visible(itr, src))
+		return 0;
+
 	rc = vy_mem_iterator_restore(src_itr, itr->last, &src->history);
 	if (rc == 0) {
 		if (!src->is_started || mem_src >= itr->skipped_src) {
@@ -414,6 +440,9 @@ vy_read_iterator_scan_disk(struct vy_read_iterator *itr, uint32_t disk_src,
 
 	assert(disk_src >= itr->disk_src && disk_src < itr->src_count);
 
+	if (!vy_read_iterator_src_is_visible(itr, src))
+		return 0;
+
 	if (!src->is_started || disk_src >= itr->skipped_src)
 		rc = vy_run_iterator_skip(src_itr, itr->last,
 					  &src->history);
@@ -442,6 +471,9 @@ vy_read_iterator_restore_mem(struct vy_read_iterator *itr,
 	struct vy_read_src *src = &itr->src[itr->mem_src];
 	struct vy_mem_iterator *src_itr = &src->mem_iterator;
 
+	if (!vy_read_iterator_src_is_visible(itr, src))
+		return 0;
+
 	/*
 	 * 'next' may refer to a statement in the memory source history,
 	 * which may be cleaned up by vy_mem_iterator_restore(), so we need
@@ -686,6 +718,7 @@ vy_read_iterator_add_mem(struct vy_read_iterator *itr, bool is_prepared_ok)
 				     &lsm->stat.memory.iterator,
 				     mem, iterator_type, itr->key,
 				     itr->read_view, is_prepared_ok);
+		sub_src->max_lsn = mem->dump_lsn;
 	}
 }
 
@@ -710,6 +743,7 @@ vy_read_iterator_add_disk(struct vy_read_iterator *itr)
 				     iterator_type, itr->key,
 				     itr->read_view, lsm->cmp_def,
 				     lsm->key_def, lsm->disk_format);
+		sub_src->max_lsn = slice->run->dump_lsn;
 	}
 }
 
diff --git a/test/vinyl-luatest/gh_10846_skip_invisible_read_src_test.lua b/test/vinyl-luatest/gh_10846_skip_invisible_read_src_test.lua
new file mode 100644
index 0000000000000000000000000000000000000000..6542a9f8d4c7ffde33ce82f699c3164ec5f1ebaf
--- /dev/null
+++ b/test/vinyl-luatest/gh_10846_skip_invisible_read_src_test.lua
@@ -0,0 +1,75 @@
+local server = require('luatest.server')
+local t = require('luatest')
+
+local g = t.group()
+
+g.before_all(function(cg)
+    t.tarantool.skip_if_not_debug()
+    cg.server = server:new()
+    cg.server:start()
+end)
+
+g.after_all(function(cg)
+    cg.server:drop()
+end)
+
+g.after_each(function(cg)
+    cg.server:exec(function()
+        box.error.injection.set('ERRINJ_VY_COMPACTION_DELAY', false)
+        if box.space.test ~= nil then
+            box.space.test:drop()
+        end
+    end)
+end)
+
+g.test_skip_invisible_read_src = function(cg)
+    cg.server:exec(function()
+        box.error.injection.set('ERRINJ_VY_COMPACTION_DELAY', true)
+
+        local s = box.schema.space.create('test', {engine = 'vinyl'})
+        local i = s:create_index('primary')
+
+        local function write(c)
+            box.begin()
+            for i = 101, 200 do
+                s:replace{i, c}
+            end
+            box.commit()
+        end
+
+        write(1)
+        box.snapshot()
+        write(2)
+
+        local gen, param, state = i:pairs()
+        local _, tuple = gen(param, state)
+        t.assert_equals(tuple, {101, 2})
+
+        t.assert_covers(i:stat(), {
+            range_count = 1,
+            run_count = 1,
+            memory = {iterator = {lookup = 1, get = {rows = 1}}},
+            disk = {iterator = {lookup = 1, get = {rows = 1}}},
+        })
+
+        box.snapshot()
+        write(3)
+        box.snapshot()
+        write(4)
+
+        box.stat.reset()
+
+        -- The iterator must be sent to a read view.
+        local _, tuple = gen(param, state)
+        t.assert_equals(tuple, {102, 2})
+
+        -- The iterator must skip the memory level and the most recent run
+        -- because they were created after the read view.
+        t.assert_covers(i:stat(), {
+            range_count = 1,
+            run_count = 3,
+            memory = {iterator = {lookup = 0, get = {rows = 0}}},
+            disk = {iterator = {lookup = 2, get = {rows = 2}}},
+        })
+    end)
+end