diff --git a/src/traft/node.rs b/src/traft/node.rs index ed05cea04aefe7cd5c9025d29759b52de46a8f63..d61d71b9452b7b37e053a62a0eee0f7770c7f7d7 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -82,20 +82,27 @@ enum NormalRequest { /// Propose `raft::prelude::Entry` of `EntryNormal` kind. /// Make a notification when it's committed. ProposeNormal { op: traft::Op, notify: Notify }, + /// Propose `raft::prelude::Entry` of `EntryConfChange` kind. - /// Make a notification when it's committed. + /// Make a notification when the second EntryConfChange is + /// committed (that corresponds to the leaving of a joint state). ProposeConfChange { + term: u64, peers: Vec<traft::Peer>, notify: Notify, }, + /// Get a read barrier. In some systems it's also called the "quorum read". /// Make a notification when index is read. ReadIndex { notify: Notify }, + /// Start a new raft term . /// Make a notification when request is processed. Campaign { notify: Notify }, + /// Handle message from anoher raft node. Step(raft::Message), + /// Tick the node. /// Make a notification when request is processed. Tick { n_times: u32, notify: Notify }, @@ -282,7 +289,11 @@ fn raft_main_loop( notifications.insert(lc.clone(), notify); } } - NormalRequest::ProposeConfChange { peers, notify } => { + NormalRequest::ProposeConfChange { + term, + peers, + notify, + } => { // In some states proposing a ConfChange is impossible. // Check if there's a reason to reject it. @@ -296,6 +307,10 @@ fn raft_main_loop( break Some("not a leader"); } + if term != raw_node.raft.term { + break Some("raft term mismatch"); + } + // Without this check the node would silently ignore the conf change. // See https://github.com/tikv/raft-rs/blob/v0.6.0/src/raft.rs#L2014-L2026 if raw_node.raft.has_pending_conf() { @@ -336,8 +351,14 @@ fn raft_main_loop( if let Err(e) = raw_node.propose_conf_change(ctx, cc) { notify.try_send(Err(e)).expect("that's a bug"); } else { + // oops, current instance isn't actually a leader + // (which is impossible in theory, but we're not + // sure in practice) and sent the ConfChange message + // to the raft network instead of appending it to the + // raft log. let last_index = raw_node.raft.raft_log.last_index(); assert!(last_index == prev_index + 1); + joint_state_latch = Some(JointStateLatch { index: last_index, notify, @@ -622,6 +643,7 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor tlog!(Info, "processing batch: {ids:?}"); // 1. Gererate raft_id + let term = Storage::term().unwrap().unwrap_or(0); let mut peers = Vec::<traft::Peer>::new(); let mut max_id = Storage::max_peer_id().unwrap(); for (req, notify) in &batch { @@ -657,7 +679,11 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor // 2. Propose the ConfChange. let (rx, tx) = fiber::Channel::new(1).into_clones(); - main_inbox.send(NormalRequest::ProposeConfChange { peers, notify: tx }); + main_inbox.send(NormalRequest::ProposeConfChange { + term, + peers, + notify: tx, + }); // main_loop gives the warranty that every ProposeConfChange // will sometimes be handled and there's no need in timeout.