diff --git a/src/traft/node.rs b/src/traft/node.rs
index 3346c8da290211806e30aebe9cd2285c17efcce6..a8a6f075f8022a51b339b18b05854eb3b2be4252 100644
--- a/src/traft/node.rs
+++ b/src/traft/node.rs
@@ -21,7 +21,7 @@ use crate::traft::event;
 use crate::traft::event::Event;
 use crate::traft::notify::{notification, Notifier, Notify};
 use crate::traft::op::{Dml, Op, OpResult, PersistInstance};
-use crate::traft::rpc::{join, update_instance};
+use crate::traft::rpc::{join, lsn, update_instance};
 use crate::traft::Address;
 use crate::traft::ConnectionPool;
 use crate::traft::ContextCoercion as _;
@@ -635,14 +635,18 @@ impl NodeImpl {
     }
 
     /// Is called during a transaction
+    ///
+    /// Returns `true` if wait_lsn is needed in `advance`.
     fn handle_committed_entries(
         &mut self,
         entries: &[raft::Entry],
         wake_governor: &mut bool,
         expelled: &mut bool,
         storage_changes: &mut StorageChanges,
-    ) {
-        for entry in entries {
+    ) -> traft::Result<()> {
+        let mut entries = entries.iter().peekable();
+
+        while let Some(&entry) = entries.peek() {
             let entry = match traft::Entry::try_from(entry) {
                 Ok(v) => v,
                 Err(e) => {
@@ -651,40 +655,73 @@ impl NodeImpl {
                 }
             };
 
-            match entry.entry_type {
-                raft::EntryType::EntryNormal => self.handle_committed_normal_entry(
-                    entry,
-                    wake_governor,
-                    expelled,
-                    storage_changes,
-                ),
-                raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
-                    self.handle_committed_conf_change(entry)
+            let mut wait_lsn = false;
+            start_transaction(|| -> tarantool::Result<()> {
+                let entry_index = entry.index;
+                match entry.entry_type {
+                    raft::EntryType::EntryNormal => {
+                        wait_lsn = self.handle_committed_normal_entry(
+                            entry,
+                            wake_governor,
+                            expelled,
+                            storage_changes,
+                        );
+                        if wait_lsn {
+                            return Ok(());
+                        }
+                    }
+                    raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
+                        self.handle_committed_conf_change(entry)
+                    }
                 }
-            }
-        }
 
-        if let Some(last_entry) = entries.last() {
-            if let Err(e) = self.raft_storage.persist_applied(last_entry.index) {
-                tlog!(
-                    Error,
-                    "error persisting applied index: {e}";
-                    "index" => last_entry.index
-                );
-            } else {
-                event::broadcast(Event::RaftEntryApplied);
+                let res = self.raft_storage.persist_applied(entry_index);
+                if let Err(e) = res {
+                    tlog!(
+                        Error,
+                        "error persisting applied index: {e}";
+                        "index" => entry_index
+                    );
+                } else {
+                    event::broadcast(Event::RaftEntryApplied);
+                }
+
+                Ok(())
+            })?;
+
+            if wait_lsn {
+                // TODO: this shouldn't ever happen for a raft leader,
+                // but what if it does?
+                // TODO: What if about we get elected leader after wait_lsn?
+                if let Err(e) = self.wait_lsn() {
+                    let timeout = MainLoop::TICK;
+                    tlog!(
+                        Warning,
+                        "failed syncing with replication master: {e}, retrying in {:?}...",
+                        timeout
+                    );
+                    fiber::sleep(timeout);
+                }
+                continue;
             }
+
+            // Actually advance the iterator.
+            let _ = entries.next();
         }
+
+        Ok(())
     }
 
     /// Is called during a transaction
+    ///
+    /// Returns `true` if wait_lsn is needed in `advance`.
     fn handle_committed_normal_entry(
         &mut self,
         entry: traft::Entry,
         wake_governor: &mut bool,
         expelled: &mut bool,
         storage_changes: &mut StorageChanges,
-    ) {
+    ) -> bool {
         assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
         let lc = entry.lc();
         let index = entry.index;
@@ -692,6 +729,14 @@ impl NodeImpl {
         tlog!(Debug, "applying entry: {op}"; "index" => index);
 
         match &op {
+            Op::DdlCommit => {
+                // TODO:
+                // if box.space._schema:get('pico_schema_change') <
+                //    pico.space.property:get('pending_schema_version')
+                // then
+                //    return true -- wait_lsn
+                todo!();
+            }
             Op::PersistInstance(PersistInstance(instance)) => {
                 *wake_governor = true;
                 storage_changes.insert(ClusterwideSpace::Instance.into());
@@ -730,6 +775,8 @@ impl NodeImpl {
             let _ = notify.send(Err(e));
             event::broadcast(Event::JointStateDrop);
         }
+
+        false
     }
 
     fn apply_op(&self, op: Op) -> traft::Result<Box<dyn AnyWithTypeName>> {
@@ -917,15 +964,19 @@ impl NodeImpl {
 
         self.handle_read_states(ready.read_states());
 
-        if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
-            // Apply committed entries.
-            self.handle_committed_entries(
-                ready.committed_entries(),
-                wake_governor,
-                expelled,
-                storage_changes,
-            );
+        // Apply committed entries.
+        let res = self.handle_committed_entries(
+            ready.committed_entries(),
+            wake_governor,
+            expelled,
+            storage_changes,
+        );
+        if let Err(e) = res {
+            tlog!(Warning, "dropping raft ready: {ready:#?}");
+            panic!("transaction failed: {e}, {}", TarantoolError::last());
+        }
 
+        if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
             // Persist uncommitted entries in the raft log.
             self.raft_storage.persist_entries(ready.entries()).unwrap();
 
@@ -954,22 +1005,19 @@ impl NodeImpl {
         // Send out messages to the other nodes.
         self.handle_messages(light_rd.take_messages());
 
-        if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
-            // Update commit index.
-            if let Some(commit) = light_rd.commit_index() {
-                self.raft_storage.persist_commit(commit).unwrap();
-            }
-
-            // Apply committed entries.
-            self.handle_committed_entries(
-                light_rd.committed_entries(),
-                wake_governor,
-                expelled,
-                storage_changes,
-            );
+        // Update commit index.
+        if let Some(commit) = light_rd.commit_index() {
+            self.raft_storage.persist_commit(commit).unwrap();
+        }
 
-            Ok(())
-        }) {
+        // Apply committed entries.
+        let res = self.handle_committed_entries(
+            light_rd.committed_entries(),
+            wake_governor,
+            expelled,
+            storage_changes,
+        );
+        if let Err(e) = res {
             tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
             panic!("transaction failed: {e}, {}", TarantoolError::last());
         }
@@ -978,6 +1026,56 @@ impl NodeImpl {
         self.raw_node.advance_apply();
     }
 
+    fn wait_lsn(&mut self) -> traft::Result<()> {
+        assert!(self.raw_node.raft.state != RaftStateRole::Leader);
+
+        let leader_id = self.raw_node.raft.leader_id;
+        let my_id = self.raw_node.raft.id;
+
+        let resp = fiber::block_on(self.pool.call(&leader_id, &lsn::Request {})?)?;
+        let target_lsn = resp.lsn;
+
+        let replicaset_id = self.storage.instances.get(&my_id)?.replicaset_id;
+        let replicaset = self.storage.replicasets.get(&replicaset_id)?;
+        let replicaset = replicaset.ok_or_else(|| {
+            Error::other(format!("replicaset info for id {replicaset_id} not found"))
+        })?;
+        let master = self.storage.instances.get(&replicaset.master_id)?;
+        let master_uuid = master.instance_uuid;
+        let mut current_lsn = None;
+
+        #[derive(tlua::LuaRead)]
+        struct ReplicationInfo {
+            lsn: u64,
+            uuid: String,
+        }
+        let replication: HashMap<u64, ReplicationInfo> =
+            crate::tarantool::eval("return box.info.replication")?;
+        for r in replication.values() {
+            if r.uuid != master_uuid {
+                continue;
+            }
+            current_lsn = Some(r.lsn);
+            break;
+        }
+
+        let current_lsn = unwrap_some_or!(current_lsn, {
+            return Err(Error::other(format!(
+                "replication info is unavailable for instance with uuid \"{master_uuid}\""
+            )));
+        });
+
+        if current_lsn < target_lsn {
+            tlog!(Info, "blocking raft loop until replication progresses";
+                "target_lsn" => target_lsn,
+                "current_lsn" => current_lsn,
+            );
+            fiber::sleep(MainLoop::TICK * 4);
+        }
+
+        Ok(())
+    }
+
     #[inline]
     fn cleanup_notifications(&mut self) {
         self.notifications.retain(|_, notify| !notify.is_closed());
diff --git a/src/traft/rpc/lsn.rs b/src/traft/rpc/lsn.rs
new file mode 100644
index 0000000000000000000000000000000000000000..4e08272997c3496f19334839b9eecffc9699523b
--- /dev/null
+++ b/src/traft/rpc/lsn.rs
@@ -0,0 +1,16 @@
+use crate::traft::Result;
+
+crate::define_rpc_request! {
+    fn proc_get_lsn(req: Request) -> Result<Response> {
+        let _ = req;
+        let lsn = crate::tarantool::eval("return box.info.lsn")?;
+        Ok(Response { lsn })
+    }
+
+    pub struct Request {
+    }
+
+    pub struct Response {
+        pub lsn: u64,
+    }
+}
diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs
index 9f0a680b3bbe4b8f6c254986ad5d26e9c5e3c6f3..108559d6cae2a83e8e97a401b104567a05c4fde9 100644
--- a/src/traft/rpc/mod.rs
+++ b/src/traft/rpc/mod.rs
@@ -14,6 +14,7 @@ use serde::de::DeserializeOwned;
 pub mod cas;
 pub mod expel;
 pub mod join;
+pub mod lsn;
 pub mod migration;
 pub mod replication;
 pub mod sharding;
diff --git a/src/traft/rpc/sync.rs b/src/traft/rpc/sync.rs
index 33e151af1c9434cba585ce3b7638714e1ce6d058..3960f1a701e64e06f530f803b2fed778e19cb56d 100644
--- a/src/traft/rpc/sync.rs
+++ b/src/traft/rpc/sync.rs
@@ -36,6 +36,8 @@ pub fn wait_for_index_timeout(
         }
 
         if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
+            // TODO: this assumes applied index is updated after committe index,
+            // maybe we should be more explicit about what we're waiting for
             event::wait_timeout(event::Event::RaftEntryApplied, timeout)?;
         } else {
             return Err(Error::Timeout);