diff --git a/src/traft/node.rs b/src/traft/node.rs index 9ecd1386559ee598e6d904323ea8daf66437d363..debe784338cfb8e45698fbfb8065574030feea4a 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -448,6 +448,7 @@ where JOINT_STATE_LATCH.with(f) } +/// Is called during a transaction fn handle_committed_entries( entries: Vec<raft::Entry>, notifications: &mut HashMap<LogicalClock, Notify>, @@ -455,6 +456,7 @@ fn handle_committed_entries( storage: &mut RaftSpaceAccess, pool: &mut ConnectionPool, topology_changed: &mut bool, + expelled: &mut bool, ) { for entry in &entries { let entry = match traft::Entry::try_from(entry) { @@ -474,6 +476,7 @@ fn handle_committed_entries( notifications, pool, topology_changed, + expelled, raw_node, ), raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { @@ -495,11 +498,13 @@ fn handle_committed_entries( } } +/// Is called during a transaction fn handle_committed_normal_entry( entry: traft::Entry, notifications: &mut HashMap<LogicalClock, Notify>, pool: &mut ConnectionPool, topology_changed: &mut bool, + expelled: &mut bool, raw_node: &mut RawNode, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); @@ -515,7 +520,8 @@ fn handle_committed_normal_entry( pool.connect(peer.raft_id, peer.peer_address.clone()); *topology_changed = true; if peer.grade == Grade::Expelled && peer.raft_id == raw_node.raft.id { - crate::tarantool::exit(0); + // cannot exit during a transaction + *expelled = true; } } @@ -537,6 +543,7 @@ fn handle_committed_normal_entry( }); } +/// Is called during a transaction fn handle_committed_conf_change( entry: traft::Entry, raw_node: &mut RawNode, @@ -600,6 +607,7 @@ fn handle_committed_conf_change( storage.persist_conf_state(&conf_state).unwrap(); } +/// Is called during a transaction fn handle_read_states( read_states: Vec<raft::ReadState>, notifications: &mut HashMap<LogicalClock, Notify>, @@ -620,6 +628,7 @@ fn handle_read_states( } } +/// Is called during a transaction fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { for msg in messages { if let Err(e) = pool.send(&msg) { @@ -669,6 +678,7 @@ fn raft_main_loop( let mut ready: raft::Ready = raw_node.ready(); let mut topology_changed = false; + let mut expelled = false; start_transaction(|| -> Result<(), TransactionError> { if !ready.messages().is_empty() { @@ -691,6 +701,7 @@ fn raft_main_loop( &mut storage, &mut pool, &mut topology_changed, + &mut expelled, ); if !ready.entries().is_empty() { @@ -725,6 +736,10 @@ fn raft_main_loop( }) .unwrap(); + if expelled { + crate::tarantool::exit(0); + } + // Advance the Raft. let mut light_rd = raw_node.advance(ready); @@ -747,6 +762,7 @@ fn raft_main_loop( &mut storage, &mut pool, &mut topology_changed, + &mut expelled, ); // Advance the apply index. @@ -755,6 +771,10 @@ fn raft_main_loop( }) .unwrap(); + if expelled { + crate::tarantool::exit(0); + } + if topology_changed { event::broadcast(Event::TopologyChanged); if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() {