Skip to content
Snippets Groups Projects
Commit d78cbe14 authored by Yaroslav Dynnikov's avatar Yaroslav Dynnikov Committed by Yaroslav Dynnikov
Browse files

refactor: move conn_pool to the InnerNode

parent d45e6406
No related branches found
No related tags found
1 merge request!250Move the code here and there
......@@ -269,6 +269,7 @@ struct InnerNode {
pub notifications: HashMap<LogicalClock, Notify>,
topology_cache: CachedCell<RaftTerm, Topology>,
storage: RaftSpaceAccess,
pool: ConnectionPool,
lc: LogicalClock,
}
......@@ -287,6 +288,17 @@ impl InnerNode {
LogicalClock::new(raft_id, gen)
};
let mut pool = ConnectionPool::builder()
.handler_name(stringify_cfunc!(raft_interact))
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
.build();
for peer in Storage::peers()? {
pool.connect(peer.raft_id, peer.peer_address);
}
let cfg = raft::Config {
id: raft_id,
applied,
......@@ -301,6 +313,7 @@ impl InnerNode {
notifications: Default::default(),
topology_cache: CachedCell::new(),
storage,
pool,
lc,
})
}
......@@ -482,7 +495,6 @@ impl InnerNode {
fn handle_committed_entries(
&mut self,
entries: Vec<raft::Entry>,
pool: &mut ConnectionPool,
topology_changed: &mut bool,
expelled: &mut bool,
) {
......@@ -497,7 +509,7 @@ impl InnerNode {
match entry.entry_type {
raft::EntryType::EntryNormal => {
self.handle_committed_normal_entry(entry, pool, topology_changed, expelled)
self.handle_committed_normal_entry(entry, topology_changed, expelled)
}
raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => {
self.handle_committed_conf_change(entry)
......@@ -522,7 +534,6 @@ impl InnerNode {
fn handle_committed_normal_entry(
&mut self,
entry: traft::Entry,
pool: &mut ConnectionPool,
topology_changed: &mut bool,
expelled: &mut bool,
) {
......@@ -536,7 +547,7 @@ impl InnerNode {
}
if let Some(traft::Op::PersistPeer { peer }) = entry.op() {
pool.connect(peer.raft_id, peer.peer_address.clone());
self.pool.connect(peer.raft_id, peer.peer_address.clone());
*topology_changed = true;
if peer.grade == Grade::Expelled && peer.raft_id == self.raft_id() {
// cannot exit during a transaction
......@@ -641,9 +652,9 @@ impl InnerNode {
}
/// Is called during a transaction
fn handle_messages(&self, messages: Vec<raft::Message>, pool: &ConnectionPool) {
fn handle_messages(&self, messages: Vec<raft::Message>) {
for msg in messages {
if let Err(e) = pool.send(&msg) {
if let Err(e) = self.pool.send(&msg) {
tlog!(Error, "{e}");
}
}
......@@ -700,16 +711,6 @@ fn raft_main_loop(
raft_loop_cond: Rc<Cond>,
) {
let mut next_tick = Instant::now() + Node::TICK;
let mut pool = ConnectionPool::builder()
.handler_name(stringify_cfunc!(raft_interact))
.call_timeout(Node::TICK * 4)
.connect_timeout(Node::TICK * 4)
.inactivity_timeout(Duration::from_secs(60))
.build();
for peer in Storage::peers().unwrap() {
pool.connect(peer.raft_id, peer.peer_address);
}
loop {
raft_loop_cond.wait_timeout(Node::TICK);
......@@ -736,7 +737,7 @@ fn raft_main_loop(
if !ready.messages().is_empty() {
// Send out the messages come from the node.
let messages = ready.take_messages();
inner_node.handle_messages(messages, &pool);
inner_node.handle_messages(messages);
}
if !ready.snapshot().is_empty() {
......@@ -748,7 +749,6 @@ fn raft_main_loop(
let committed_entries = ready.take_committed_entries();
inner_node.handle_committed_entries(
committed_entries,
&mut pool,
&mut topology_changed,
&mut expelled,
);
......@@ -775,7 +775,7 @@ fn raft_main_loop(
if !ready.persisted_messages().is_empty() {
// Send out the persisted messages come from the node.
let messages = ready.take_persisted_messages();
inner_node.handle_messages(messages, &pool);
inner_node.handle_messages(messages);
}
let read_states = ready.take_read_states();
......@@ -800,13 +800,12 @@ fn raft_main_loop(
// Send out the messages.
let messages = light_rd.take_messages();
inner_node.handle_messages(messages, &pool);
inner_node.handle_messages(messages);
// Apply all committed entries.
let committed_entries = light_rd.take_committed_entries();
inner_node.handle_committed_entries(
committed_entries,
&mut pool,
&mut topology_changed,
&mut expelled,
);
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment