diff --git a/src/rpc/join.rs b/src/rpc/join.rs index 9338b0de1e70a85675c2423ba434dd02b15a1e6b..ba924f015288a9de95a8d3bf42fc98605abe2bb2 100644 --- a/src/rpc/join.rs +++ b/src/rpc/join.rs @@ -118,7 +118,6 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R cas::Range::new(ClusterwideTable::Tier), cas::Range::new(ClusterwideTable::Replicaset), ]; - // Only in this order - so that when instance exists - address will always be there. let cas_req = crate::cas::Request::new( Op::BatchDml { ops }, cas::Predicate { @@ -129,24 +128,25 @@ pub fn handle_join_request_and_wait(req: Request, timeout: Duration) -> Result<R ADMIN_ID, )?; let res = cas::compare_and_swap(&cas_req, deadline.duration_since(fiber::clock())); - match res { - Ok((index, term)) => { - node.wait_index(index, deadline.duration_since(fiber::clock()))?; - if term != raft::Storage::term(raft_storage, index)? { - // leader switched - retry - continue; - } - } - Err(err) => { - if err.is_retriable() { - // cas error - retry - fiber::sleep(Duration::from_millis(500)); + let (index, term) = crate::unwrap_ok_or!(res, + Err(e) => { + if e.is_retriable() { + crate::tlog!(Debug, "local CaS rejected: {e}"); + fiber::sleep(Duration::from_millis(250)); continue; } else { - return Err(err); + return Err(e); } } + ); + + node.wait_index(index, deadline.duration_since(fiber::clock()))?; + + if term != raft::Storage::term(&node.raft_storage, index)? { + // Leader has changed and the entry got rolled back, retry. + continue; } + node.main_loop.wakeup(); // A joined instance needs to communicate with other nodes. diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index a796b4feb241fee1d341708ae6d87e6045dc8082..ffcab28e0646833ba49b3045afdfd7b2944830a5 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -221,27 +221,28 @@ pub fn handle_update_instance_request_in_governor_and_also_wait_too( ADMIN_ID, )?; let res = cas::compare_and_swap(&cas_req, deadline.duration_since(fiber::clock())); - match res { - Ok((index, term)) => { - node.wait_index(index, deadline.duration_since(fiber::clock()))?; - if term != raft::Storage::term(raft_storage, index)? { - // leader switched - retry - continue; - } - } - Err(err) => { + let (index, term) = crate::unwrap_ok_or!(res, + Err(e) => { if req.dont_retry { - return Err(err); + return Err(e); } - if err.is_retriable() { - // cas error - retry - fiber::sleep(Duration::from_millis(500)); + if e.is_retriable() { + crate::tlog!(Debug, "local CaS rejected: {e}"); + fiber::sleep(Duration::from_millis(250)); continue; } else { - return Err(err); + return Err(e); } } + ); + + node.wait_index(index, deadline.duration_since(fiber::clock()))?; + + if term != raft::Storage::term(raft_storage, index)? { + // Leader has changed and the entry got rolled back, retry. + continue; } + node.main_loop.wakeup(); drop(guard); return Ok(()); diff --git a/src/schema.rs b/src/schema.rs index 997e6b3dd5b6ae6531e5d54d71ca3674390b51db..d81bc57909debe0f36b6c54a3a6842c8a315e0be 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -2482,9 +2482,19 @@ pub fn abort_ddl(timeout: Duration) -> traft::Result<RaftIndex> { instance_id, }; let req = Request::new(Op::DdlAbort { cause }, predicate, effective_user_id())?; - // FIXME: this error handling is wrong, must retry if e.is_retriable() - let (index, term) = compare_and_swap(&req, timeout)?; + let res = compare_and_swap(&req, timeout); + let (index, term) = crate::unwrap_ok_or!(res, + Err(e) => { + if e.is_retriable() { + continue; + } else { + return Err(e); + } + } + ); + node.wait_index(index, timeout)?; + if raft::Storage::term(&node.raft_storage, index)? != term { // leader switched - retry continue;