Skip to content
Snippets Groups Projects
Commit 61306ea3 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Yaroslav Dynnikov
Browse files

refactor: support passing additional Dml entries to update_instance_request

parent d8ffec81
No related branches found
No related tags found
1 merge request!1156fix: store just the vshard config version
......@@ -96,8 +96,26 @@ impl Request {
/// Returns `Ok(())` when the entry is committed.
///
/// **This function yields**
// TODO: for this function to be async and have an outer timeout wait_* fns need to be async
#[inline(always)]
pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration) -> Result<()> {
handle_update_instance_request_in_governor_and_also_wait_too(req, None, timeout)
}
/// Processes the [`crate::rpc::update_instance::Request`] and appends
/// the corresponding operation along with the provided `additional_dml` entry
/// to the raft log within a single [`Op::BatchDml`] (if successful).
///
/// **This function should be used directly only from governor** everywhere else
/// should use [`handle_update_instance_request_and_wait`] instead.
///
/// Returns `Ok(())` when the entry is committed.
///
/// **This function yields**
pub fn handle_update_instance_request_in_governor_and_also_wait_too(
req: Request,
additional_dml: Option<&Dml>,
timeout: Duration,
) -> Result<()> {
let node = node::global()?;
let cluster_id = node.raft_storage.cluster_id()?;
let storage = &node.storage;
......@@ -111,9 +129,15 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
});
}
#[cfg(debug_assertions)]
if let Some(op) = additional_dml {
debug_assert_eq!(op.table_id(), ClusterwideTable::Property.id(), "for CaS safety reasons currently we only allow updating _pico_property simultaneously with instance");
}
let deadline = fiber::clock().saturating_add(timeout);
loop {
let old_instance = storage.instances.get(&req.instance_id)?;
// TODO: this should instead be a Dml::Update operation with just the fields which are changed
let mut new_instance = old_instance.clone();
update_instance(&mut new_instance, &req, storage).map_err(raft::Error::ConfChangeError)?;
if old_instance == new_instance {
......@@ -127,6 +151,14 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
let dml = Dml::replace(ClusterwideTable::Instance, &new_instance, ADMIN_ID)
.expect("encoding should not fail");
let op = match additional_dml {
// TODO: to eliminate redundant copies here we should refactor `BatchDml` and/or `Dml`
Some(additional_dml) => Op::BatchDml {
ops: vec![dml, additional_dml.clone()],
},
// No need to wrap it in a BatchDml and confuse the users
None => Op::Dml(dml),
};
let ranges = vec![
cas::Range::new(ClusterwideTable::Instance),
......@@ -135,7 +167,7 @@ pub fn handle_update_instance_request_and_wait(req: Request, timeout: Duration)
];
let cas_req = crate::cas::Request::new(
Op::Dml(dml),
op,
cas::Predicate {
index: raft_storage.applied()?,
term: raft_storage.term()?,
......
......@@ -426,6 +426,15 @@ pub enum Dml {
}
impl Dml {
pub fn table_id(&self) -> SpaceId {
match *self {
Dml::Insert { table, .. } => table,
Dml::Replace { table, .. } => table,
Dml::Update { table, .. } => table,
Dml::Delete { table, .. } => table,
}
}
pub fn initiator(&self) -> UserId {
match self {
Dml::Insert { initiator, .. } => *initiator,
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment