Skip to content
Snippets Groups Projects
Commit 984aa4a6 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon:
Browse files

refactor: framework for wait_lsn as when applying a raft entry

parent f4665cbb
No related branches found
No related tags found
1 merge request!516OpDdl create space
......@@ -21,7 +21,7 @@ use crate::traft::event;
use crate::traft::event::Event;
use crate::traft::notify::{notification, Notifier, Notify};
use crate::traft::op::{Dml, Op, OpResult, PersistInstance};
use crate::traft::rpc::{join, update_instance};
use crate::traft::rpc::{join, lsn, update_instance};
use crate::traft::Address;
use crate::traft::ConnectionPool;
use crate::traft::ContextCoercion as _;
......@@ -635,14 +635,18 @@ impl NodeImpl {
}
/// Is called during a transaction
///
/// Returns `true` if wait_lsn is needed in `advance`.
fn handle_committed_entries(
&mut self,
entries: &[raft::Entry],
wake_governor: &mut bool,
expelled: &mut bool,
storage_changes: &mut StorageChanges,
) {
for entry in entries {
) -> traft::Result<()> {
let mut entries = entries.iter().peekable();
while let Some(&entry) = entries.peek() {
let entry = match traft::Entry::try_from(entry) {
Ok(v) => v,
Err(e) => {
......@@ -651,40 +655,73 @@ impl NodeImpl {
}
};
match entry.entry_type {
raft::EntryType::EntryNormal => self.handle_committed_normal_entry(
entry,
wake_governor,
expelled,
storage_changes,
),
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
self.handle_committed_conf_change(entry)
let mut wait_lsn = false;
start_transaction(|| -> tarantool::Result<()> {
let entry_index = entry.index;
match entry.entry_type {
raft::EntryType::EntryNormal => {
wait_lsn = self.handle_committed_normal_entry(
entry,
wake_governor,
expelled,
storage_changes,
);
if wait_lsn {
return Ok(());
}
}
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
self.handle_committed_conf_change(entry)
}
}
}
}
if let Some(last_entry) = entries.last() {
if let Err(e) = self.raft_storage.persist_applied(last_entry.index) {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => last_entry.index
);
} else {
event::broadcast(Event::RaftEntryApplied);
let res = self.raft_storage.persist_applied(entry_index);
if let Err(e) = res {
tlog!(
Error,
"error persisting applied index: {e}";
"index" => entry_index
);
} else {
event::broadcast(Event::RaftEntryApplied);
}
Ok(())
})?;
if wait_lsn {
// TODO: this shouldn't ever happen for a raft leader,
// but what if it does?
// TODO: What if about we get elected leader after wait_lsn?
if let Err(e) = self.wait_lsn() {
let timeout = MainLoop::TICK;
tlog!(
Warning,
"failed syncing with replication master: {e}, retrying in {:?}...",
timeout
);
fiber::sleep(timeout);
}
continue;
}
// Actually advance the iterator.
let _ = entries.next();
}
Ok(())
}
/// Is called during a transaction
///
/// Returns `true` if wait_lsn is needed in `advance`.
fn handle_committed_normal_entry(
&mut self,
entry: traft::Entry,
wake_governor: &mut bool,
expelled: &mut bool,
storage_changes: &mut StorageChanges,
) {
) -> bool {
assert_eq!(entry.entry_type, raft::EntryType::EntryNormal);
let lc = entry.lc();
let index = entry.index;
......@@ -692,6 +729,14 @@ impl NodeImpl {
tlog!(Debug, "applying entry: {op}"; "index" => index);
match &op {
Op::DdlCommit => {
// TODO:
// if box.space._schema:get('pico_schema_change') <
// pico.space.property:get('pending_schema_version')
// then
// return true -- wait_lsn
todo!();
}
Op::PersistInstance(PersistInstance(instance)) => {
*wake_governor = true;
storage_changes.insert(ClusterwideSpace::Instance.into());
......@@ -730,6 +775,8 @@ impl NodeImpl {
let _ = notify.send(Err(e));
event::broadcast(Event::JointStateDrop);
}
false
}
fn apply_op(&self, op: Op) -> traft::Result<Box<dyn AnyWithTypeName>> {
......@@ -917,15 +964,19 @@ impl NodeImpl {
self.handle_read_states(ready.read_states());
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Apply committed entries.
self.handle_committed_entries(
ready.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
// Apply committed entries.
let res = self.handle_committed_entries(
ready.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
if let Err(e) = res {
tlog!(Warning, "dropping raft ready: {ready:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Persist uncommitted entries in the raft log.
self.raft_storage.persist_entries(ready.entries()).unwrap();
......@@ -954,22 +1005,19 @@ impl NodeImpl {
// Send out messages to the other nodes.
self.handle_messages(light_rd.take_messages());
if let Err(e) = start_transaction(|| -> Result<(), TransactionError> {
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.raft_storage.persist_commit(commit).unwrap();
}
// Apply committed entries.
self.handle_committed_entries(
light_rd.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
// Update commit index.
if let Some(commit) = light_rd.commit_index() {
self.raft_storage.persist_commit(commit).unwrap();
}
Ok(())
}) {
// Apply committed entries.
let res = self.handle_committed_entries(
light_rd.committed_entries(),
wake_governor,
expelled,
storage_changes,
);
if let Err(e) = res {
tlog!(Warning, "dropping raft light ready: {light_rd:#?}");
panic!("transaction failed: {e}, {}", TarantoolError::last());
}
......@@ -978,6 +1026,56 @@ impl NodeImpl {
self.raw_node.advance_apply();
}
fn wait_lsn(&mut self) -> traft::Result<()> {
assert!(self.raw_node.raft.state != RaftStateRole::Leader);
let leader_id = self.raw_node.raft.leader_id;
let my_id = self.raw_node.raft.id;
let resp = fiber::block_on(self.pool.call(&leader_id, &lsn::Request {})?)?;
let target_lsn = resp.lsn;
let replicaset_id = self.storage.instances.get(&my_id)?.replicaset_id;
let replicaset = self.storage.replicasets.get(&replicaset_id)?;
let replicaset = replicaset.ok_or_else(|| {
Error::other(format!("replicaset info for id {replicaset_id} not found"))
})?;
let master = self.storage.instances.get(&replicaset.master_id)?;
let master_uuid = master.instance_uuid;
let mut current_lsn = None;
#[derive(tlua::LuaRead)]
struct ReplicationInfo {
lsn: u64,
uuid: String,
}
let replication: HashMap<u64, ReplicationInfo> =
crate::tarantool::eval("return box.info.replication")?;
for r in replication.values() {
if r.uuid != master_uuid {
continue;
}
current_lsn = Some(r.lsn);
break;
}
let current_lsn = unwrap_some_or!(current_lsn, {
return Err(Error::other(format!(
"replication info is unavailable for instance with uuid \"{master_uuid}\""
)));
});
if current_lsn < target_lsn {
tlog!(Info, "blocking raft loop until replication progresses";
"target_lsn" => target_lsn,
"current_lsn" => current_lsn,
);
fiber::sleep(MainLoop::TICK * 4);
}
Ok(())
}
#[inline]
fn cleanup_notifications(&mut self) {
self.notifications.retain(|_, notify| !notify.is_closed());
......
use crate::traft::Result;
crate::define_rpc_request! {
fn proc_get_lsn(req: Request) -> Result<Response> {
let _ = req;
let lsn = crate::tarantool::eval("return box.info.lsn")?;
Ok(Response { lsn })
}
pub struct Request {
}
pub struct Response {
pub lsn: u64,
}
}
......@@ -14,6 +14,7 @@ use serde::de::DeserializeOwned;
pub mod cas;
pub mod expel;
pub mod join;
pub mod lsn;
pub mod migration;
pub mod replication;
pub mod sharding;
......
......@@ -36,6 +36,8 @@ pub fn wait_for_index_timeout(
}
if let Some(timeout) = deadline.checked_duration_since(Instant::now()) {
// TODO: this assumes applied index is updated after committe index,
// maybe we should be more explicit about what we're waiting for
event::wait_timeout(event::Event::RaftEntryApplied, timeout)?;
} else {
return Err(Error::Timeout);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment