From 984aa4a6d9093f7bb0a02f229157a652c6492549 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Fri, 21 Apr 2023 18:59:35 +0300 Subject: [PATCH] refactor: framework for wait_lsn as when applying a raft entry --- src/traft/node.rs | 192 +++++++++++++++++++++++++++++++----------- src/traft/rpc/lsn.rs | 16 ++++ src/traft/rpc/mod.rs | 1 + src/traft/rpc/sync.rs | 2 + 4 files changed, 164 insertions(+), 47 deletions(-) create mode 100644 src/traft/rpc/lsn.rs diff --git a/src/traft/node.rs b/src/traft/node.rs index 3346c8da29..a8a6f075f8 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -21,7 +21,7 @@ use crate::traft::event; use crate::traft::event::Event; use crate::traft::notify::{notification, Notifier, Notify}; use crate::traft::op::{Dml, Op, OpResult, PersistInstance}; -use crate::traft::rpc::{join, update_instance}; +use crate::traft::rpc::{join, lsn, update_instance}; use crate::traft::Address; use crate::traft::ConnectionPool; use crate::traft::ContextCoercion as _; @@ -635,14 +635,18 @@ impl NodeImpl { } /// Is called during a transaction + /// + /// Returns `true` if wait_lsn is needed in `advance`. fn handle_committed_entries( &mut self, entries: &[raft::Entry], wake_governor: &mut bool, expelled: &mut bool, storage_changes: &mut StorageChanges, - ) { - for entry in entries { + ) -> traft::Result<()> { + let mut entries = entries.iter().peekable(); + + while let Some(&entry) = entries.peek() { let entry = match traft::Entry::try_from(entry) { Ok(v) => v, Err(e) => { @@ -651,40 +655,73 @@ impl NodeImpl { } }; - match entry.entry_type { - raft::EntryType::EntryNormal => self.handle_committed_normal_entry( - entry, - wake_governor, - expelled, - storage_changes, - ), - raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { - self.handle_committed_conf_change(entry) + let mut wait_lsn = false; + start_transaction(|| -> tarantool::Result<()> { + let entry_index = entry.index; + match entry.entry_type { + raft::EntryType::EntryNormal => { + wait_lsn = self.handle_committed_normal_entry( + entry, + wake_governor, + expelled, + storage_changes, + ); + if wait_lsn { + return Ok(()); + } + } + raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { + self.handle_committed_conf_change(entry) + } } - } - } - if let Some(last_entry) = entries.last() { - if let Err(e) = self.raft_storage.persist_applied(last_entry.index) { - tlog!( - Error, - "error persisting applied index: {e}"; - "index" => last_entry.index - ); - } else { - event::broadcast(Event::RaftEntryApplied); + let res = self.raft_storage.persist_applied(entry_index); + if let Err(e) = res { + tlog!( + Error, + "error persisting applied index: {e}"; + "index" => entry_index + ); + } else { + event::broadcast(Event::RaftEntryApplied); + } + + Ok(()) + })?; + + if wait_lsn { + // TODO: this shouldn't ever happen for a raft leader, + // but what if it does? + // TODO: What if about we get elected leader after wait_lsn? + if let Err(e) = self.wait_lsn() { + let timeout = MainLoop::TICK; + tlog!( + Warning, + "failed syncing with replication master: {e}, retrying in {:?}...", + timeout + ); + fiber::sleep(timeout); + } + continue; } + + // Actually advance the iterator. + let _ = entries.next(); } + + Ok(()) } /// Is called during a transaction + /// + /// Returns `true` if wait_lsn is needed in `advance`. fn handle_committed_normal_entry( &mut self, entry: traft::Entry, wake_governor: &mut bool, expelled: &mut bool, storage_changes: &mut StorageChanges, - ) { + ) -> bool { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); let lc = entry.lc(); let index = entry.index; @@ -692,6 +729,14 @@ impl NodeImpl { tlog!(Debug, "applying entry: {op}"; "index" => index); match &op { + Op::DdlCommit => { + // TODO: + // if box.space._schema:get('pico_schema_change') < + // pico.space.property:get('pending_schema_version') + // then + // return true -- wait_lsn + todo!(); + } Op::PersistInstance(PersistInstance(instance)) => { *wake_governor = true; storage_changes.insert(ClusterwideSpace::Instance.into()); @@ -730,6 +775,8 @@ impl NodeImpl { let _ = notify.send(Err(e)); event::broadcast(Event::JointStateDrop); } + + false } fn apply_op(&self, op: Op) -> traft::Result<Box<dyn AnyWithTypeName>> { @@ -917,15 +964,19 @@ impl NodeImpl { self.handle_read_states(ready.read_states()); - if let Err(e) = start_transaction(|| -> Result<(), TransactionError> { - // Apply committed entries. - self.handle_committed_entries( - ready.committed_entries(), - wake_governor, - expelled, - storage_changes, - ); + // Apply committed entries. + let res = self.handle_committed_entries( + ready.committed_entries(), + wake_governor, + expelled, + storage_changes, + ); + if let Err(e) = res { + tlog!(Warning, "dropping raft ready: {ready:#?}"); + panic!("transaction failed: {e}, {}", TarantoolError::last()); + } + if let Err(e) = start_transaction(|| -> Result<(), TransactionError> { // Persist uncommitted entries in the raft log. self.raft_storage.persist_entries(ready.entries()).unwrap(); @@ -954,22 +1005,19 @@ impl NodeImpl { // Send out messages to the other nodes. self.handle_messages(light_rd.take_messages()); - if let Err(e) = start_transaction(|| -> Result<(), TransactionError> { - // Update commit index. - if let Some(commit) = light_rd.commit_index() { - self.raft_storage.persist_commit(commit).unwrap(); - } - - // Apply committed entries. - self.handle_committed_entries( - light_rd.committed_entries(), - wake_governor, - expelled, - storage_changes, - ); + // Update commit index. + if let Some(commit) = light_rd.commit_index() { + self.raft_storage.persist_commit(commit).unwrap(); + } - Ok(()) - }) { + // Apply committed entries. + let res = self.handle_committed_entries( + light_rd.committed_entries(), + wake_governor, + expelled, + storage_changes, + ); + if let Err(e) = res { tlog!(Warning, "dropping raft light ready: {light_rd:#?}"); panic!("transaction failed: {e}, {}", TarantoolError::last()); } @@ -978,6 +1026,56 @@ impl NodeImpl { self.raw_node.advance_apply(); } + fn wait_lsn(&mut self) -> traft::Result<()> { + assert!(self.raw_node.raft.state != RaftStateRole::Leader); + + let leader_id = self.raw_node.raft.leader_id; + let my_id = self.raw_node.raft.id; + + let resp = fiber::block_on(self.pool.call(&leader_id, &lsn::Request {})?)?; + let target_lsn = resp.lsn; + + let replicaset_id = self.storage.instances.get(&my_id)?.replicaset_id; + let replicaset = self.storage.replicasets.get(&replicaset_id)?; + let replicaset = replicaset.ok_or_else(|| { + Error::other(format!("replicaset info for id {replicaset_id} not found")) + })?; + let master = self.storage.instances.get(&replicaset.master_id)?; + let master_uuid = master.instance_uuid; + let mut current_lsn = None; + + #[derive(tlua::LuaRead)] + struct ReplicationInfo { + lsn: u64, + uuid: String, + } + let replication: HashMap<u64, ReplicationInfo> = + crate::tarantool::eval("return box.info.replication")?; + for r in replication.values() { + if r.uuid != master_uuid { + continue; + } + current_lsn = Some(r.lsn); + break; + } + + let current_lsn = unwrap_some_or!(current_lsn, { + return Err(Error::other(format!( + "replication info is unavailable for instance with uuid \"{master_uuid}\"" + ))); + }); + + if current_lsn < target_lsn { + tlog!(Info, "blocking raft loop until replication progresses"; + "target_lsn" => target_lsn, + "current_lsn" => current_lsn, + ); + fiber::sleep(MainLoop::TICK * 4); + } + + Ok(()) + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications.retain(|_, notify| !notify.is_closed()); diff --git a/src/traft/rpc/lsn.rs b/src/traft/rpc/lsn.rs new file mode 100644 index 0000000000..4e08272997 --- /dev/null +++ b/src/traft/rpc/lsn.rs @@ -0,0 +1,16 @@ +use crate::traft::Result; + +crate::define_rpc_request! { + fn proc_get_lsn(req: Request) -> Result<Response> { + let _ = req; + let lsn = crate::tarantool::eval("return box.info.lsn")?; + Ok(Response { lsn }) + } + + pub struct Request { + } + + pub struct Response { + pub lsn: u64, + } +} diff --git a/src/traft/rpc/mod.rs b/src/traft/rpc/mod.rs index 9f0a680b3b..108559d6ca 100644 --- a/src/traft/rpc/mod.rs +++ b/src/traft/rpc/mod.rs @@ -14,6 +14,7 @@ use serde::de::DeserializeOwned; pub mod cas; pub mod expel; pub mod join; +pub mod lsn; pub mod migration; pub mod replication; pub mod sharding; diff --git a/src/traft/rpc/sync.rs b/src/traft/rpc/sync.rs index 33e151af1c..3960f1a701 100644 --- a/src/traft/rpc/sync.rs +++ b/src/traft/rpc/sync.rs @@ -36,6 +36,8 @@ pub fn wait_for_index_timeout( } if let Some(timeout) = deadline.checked_duration_since(Instant::now()) { + // TODO: this assumes applied index is updated after committe index, + // maybe we should be more explicit about what we're waiting for event::wait_timeout(event::Event::RaftEntryApplied, timeout)?; } else { return Err(Error::Timeout); -- GitLab