From 84f665fd793d150af32a2917fe9c171d9c1b3a10 Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Wed, 21 Sep 2022 19:01:23 +0300 Subject: [PATCH] refactor(network): PoolWorker now reads peer address from storage. --- src/traft/network.rs | 159 ++++++++++++++++++++++++------------------- src/traft/node.rs | 12 ++-- 2 files changed, 94 insertions(+), 77 deletions(-) diff --git a/src/traft/network.rs b/src/traft/network.rs index 2cc42d7ddb..4d6aba2e69 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -5,7 +5,6 @@ use ::tarantool::net_box::ConnOptions; use ::tarantool::net_box::Options; use ::tarantool::util::IntoClones; use std::cell::Cell; -use std::cell::RefCell; use std::collections::HashMap; use std::rc::Rc; use std::time::Duration; @@ -14,7 +13,9 @@ use crate::mailbox::Mailbox; use crate::tlog; use crate::traft; use crate::traft::error::PoolSendError; +use crate::traft::storage::peer_field::{self, PeerAddress}; use crate::traft::RaftId; +use crate::traft::{PeerStorage, Storage}; #[derive(Clone, Debug)] struct WorkerOptions { @@ -35,29 +36,29 @@ impl Default for WorkerOptions { } } +//////////////////////////////////////////////////////////////////////////////// +// PoolWorker +//////////////////////////////////////////////////////////////////////////////// + struct PoolWorker { id: RaftId, - uri: Rc<RefCell<String>>, inbox: Mailbox<traft::MessagePb>, fiber: fiber::LuaUnitJoinHandle<'static>, stop_flag: Rc<Cell<Option<()>>>, } impl PoolWorker { - pub fn run(id: RaftId, uri: String, opts: WorkerOptions) -> PoolWorker { - let uri = Rc::new(RefCell::new(uri)); + pub fn run(id: RaftId, storage: PeerStorage, opts: WorkerOptions) -> PoolWorker { let inbox = Mailbox::new(); let stop_flag: Rc<Cell<Option<()>>> = Default::default(); let fiber = fiber::defer_proc({ - let uri = uri.clone(); let inbox = inbox.clone(); let stop_flag = stop_flag.clone(); - move || Self::worker_loop(uri, inbox, stop_flag, &opts) + move || Self::worker_loop(id, storage, inbox, stop_flag, &opts) }); Self { id, - uri, fiber, inbox, stop_flag, @@ -65,13 +66,14 @@ impl PoolWorker { } fn worker_loop( - uri: Rc<RefCell<String>>, + raft_id: RaftId, + storage: PeerStorage, inbox: Mailbox<traft::MessagePb>, stop_flag: Rc<Cell<Option<()>>>, opts: &WorkerOptions, ) { struct ConnCache { - uri: String, + address: String, conn: Conn, } let cache: Cell<Option<ConnCache>> = Cell::default(); @@ -89,30 +91,33 @@ impl PoolWorker { continue; } - let mut conn_cache = match cache.take() { - Some(v) if uri.borrow().eq(&v.uri) => Some(v), - _ => None, - }; + let ConnCache { address, conn } = ::tarantool::unwrap_or!(cache.take(), { + let address = storage.field_by_raft_id::<PeerAddress>(raft_id); + let address = crate::unwrap_ok_or!(address, + Err(e) => { + tlog!(Warning, "failed getting peer address: {e}"; + "raft_id" => raft_id, + ); + continue + } + ); - if conn_cache.is_none() { - let uri = uri.borrow(); let conn_opts = ConnOptions { connect_timeout: opts.connect_timeout, ..Default::default() }; - conn_cache = Conn::new(uri.clone(), conn_opts, None) - .map(|conn| ConnCache { - uri: uri.clone(), - conn, - }) - .map_err(|e| tlog!(Debug, "{uri}: {e}")) - .ok(); - } - - let (uri, conn) = match &conn_cache { - Some(v) => (&v.uri, &v.conn), - None => continue, - }; + let conn = Conn::new(&address, conn_opts, None); + let conn = crate::unwrap_ok_or!(conn, + Err(e) => { + tlog!(Debug, "failed establishing connection to peer: {e}"; + "peer" => address, + "raft_id" => raft_id, + ); + continue + } + ); + ConnCache { address, conn } + }); let call_opts = Options { timeout: Some(opts.call_timeout), @@ -121,8 +126,10 @@ impl PoolWorker { // implicit yield match conn.call(opts.handler_name, &messages, &call_opts) { - Ok(_) => cache.set(conn_cache), - Err(e) => tlog!(Debug, "Interact with {uri} -> {e}"), + Ok(_) => cache.set(Some(ConnCache { address, conn })), + Err(e) => tlog!(Debug, "failed sending messages to peer: {e}"; + "peer" => address, + ), } if stop_flag.take().is_some() { @@ -131,10 +138,6 @@ impl PoolWorker { } } - pub fn set_uri(&self, uri: String) { - self.uri.replace(uri); - } - pub fn send(&self, msg: raft::Message) -> Result<(), PoolSendError> { self.inbox.send(msg.into()); Ok(()) @@ -148,21 +151,22 @@ impl PoolWorker { impl std::fmt::Debug for PoolWorker { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - f.debug_struct("PoolWorker") - .field("id", &self.id) - .field("uri", &self.uri) - .finish() + f.debug_struct("PoolWorker").field("id", &self.id).finish() } } -#[derive(Default)] +//////////////////////////////////////////////////////////////////////////////// +// ConnectionPoolBuilder +//////////////////////////////////////////////////////////////////////////////// + pub struct ConnectionPoolBuilder { worker_options: WorkerOptions, + storage: PeerStorage, } macro_rules! builder_option { ($opt:ident, $t:ty) => { - pub fn $opt(&'_ mut self, val: $t) -> &'_ mut Self { + pub fn $opt(mut self, val: $t) -> Self { self.worker_options.$opt = val; self } @@ -175,34 +179,32 @@ impl ConnectionPoolBuilder { builder_option!(connect_timeout, Duration); builder_option!(inactivity_timeout, Duration); - pub fn build(&self) -> ConnectionPool { + pub fn build(self) -> ConnectionPool { ConnectionPool { - worker_options: self.worker_options.clone(), + worker_options: self.worker_options, workers: HashMap::new(), + storage: self.storage, } } } +//////////////////////////////////////////////////////////////////////////////// +// ConnectionPool +//////////////////////////////////////////////////////////////////////////////// + #[derive(Debug)] pub struct ConnectionPool { worker_options: WorkerOptions, workers: HashMap<RaftId, PoolWorker>, + storage: PeerStorage, } impl ConnectionPool { - pub fn builder() -> ConnectionPoolBuilder { - ConnectionPoolBuilder::default() - } - - /// Create a worker for communicating with another node. - /// Connection is established lazily at the first request. - /// It's also re-established automatically upon any error. - pub fn connect(&mut self, id: RaftId, uri: String) { - let worker_options = self.worker_options.clone(); - self.workers - .entry(id) - .and_modify(|wrk| wrk.set_uri(uri.clone())) - .or_insert_with(|| PoolWorker::run(id, uri.clone(), worker_options)); + pub fn builder(storage: PeerStorage) -> ConnectionPoolBuilder { + ConnectionPoolBuilder { + storage, + worker_options: Default::default(), + } } #[allow(dead_code)] @@ -216,15 +218,21 @@ impl ConnectionPool { /// of the raft node to re-send it later. /// /// This function never yields. - pub fn send(&self, msg: &raft::Message) -> Result<(), PoolSendError> { - // tlog!(Debug, "Sending {msg:?}"); - - let wrk = self - .workers - .get(&msg.to) - .ok_or(PoolSendError::UnknownRecipient(msg.to))?; - // let msg = Message::try_from(msg.clone())?; - wrk.send(msg.clone()) + pub fn send(&mut self, msg: raft::Message) -> Result<(), PoolSendError> { + let raft_id = msg.to; + if !self.workers.contains_key(&msg.to) { + let storage = self.storage.clone(); + let worker_options = self.worker_options.clone(); + // check that peer exists + storage + .field_by_raft_id::<peer_field::RaftId>(raft_id) + .map_err(|_| PoolSendError::UnknownRecipient(raft_id))?; + let wrk = PoolWorker::run(raft_id, storage, worker_options); + self.workers.insert(raft_id, wrk); + } + + let wrk = self.workers.get(&raft_id).expect("just inserted it"); + wrk.send(msg) } } @@ -236,6 +244,10 @@ impl Drop for ConnectionPool { } } +//////////////////////////////////////////////////////////////////////////////// +// tests +//////////////////////////////////////////////////////////////////////////////// + // TODO test connecting twice (reconnecting) // thread 'main' panicked at 'UnitJoinHandle dropped before being joined', // picodata::traft::network::ConnectionPool::connect @@ -264,15 +276,24 @@ inventory::submit!(crate::InnerTest { }), ); + let storage = Storage::peers_access(); // Connect to the current Tarantool instance - let mut pool = ConnectionPool::builder() + let mut pool = ConnectionPool::builder(storage.clone()) .handler_name("test_interact") .call_timeout(Duration::from_millis(50)) .connect_timeout(Duration::from_millis(50)) .build(); let listen: String = l.eval("return box.info.listen").unwrap(); + + storage + .persist_peer(&traft::Peer { + raft_id: 1337, + peer_address: listen.clone(), + ..Default::default() + }) + .unwrap(); tlog!(Info, "TEST: connecting {listen}"); - pool.connect(1337, listen); + // pool.connect(1337, listen); let heartbeat_to_from = |to: RaftId, from: RaftId| raft::Message { msg_type: raft::MessageType::MsgHeartbeat, @@ -283,7 +304,7 @@ inventory::submit!(crate::InnerTest { // Send a request // TODO: assert there's no yield - pool.send(&heartbeat_to_from(1337, 1)).unwrap(); + pool.send(heartbeat_to_from(1337, 1)).unwrap(); // Assert it arrives // Assert equality @@ -294,7 +315,7 @@ inventory::submit!(crate::InnerTest { // Assert unknown recepient error assert!(matches!( - pool.send(&heartbeat_to_from(9999, 3)).unwrap_err(), + pool.send(heartbeat_to_from(9999, 3)).unwrap_err(), PoolSendError::UnknownRecipient(9999) )); @@ -314,7 +335,7 @@ inventory::submit!(crate::InnerTest { // Send the second request // TODO: assert there's no yield - pool.send(&heartbeat_to_from(1337, 4)).unwrap(); + pool.send(heartbeat_to_from(1337, 4)).unwrap(); // Assert it arrives too // Assert equality diff --git a/src/traft/node.rs b/src/traft/node.rs index acef89c61c..0e3f747a70 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -297,17 +297,14 @@ impl NodeImpl { LogicalClock::new(raft_id, gen) }; - let mut pool = ConnectionPool::builder() + let peer_storage = Storage::peers_access().clone(); + let pool = ConnectionPool::builder(peer_storage) .handler_name(stringify_cfunc!(raft_interact)) .call_timeout(Node::TICK * 4) .connect_timeout(Node::TICK * 4) .inactivity_timeout(Duration::from_secs(60)) .build(); - for peer in Storage::peers()? { - pool.connect(peer.raft_id, peer.peer_address); - } - let cfg = raft::Config { id: raft_id, applied, @@ -556,7 +553,6 @@ impl NodeImpl { } if let Some(traft::Op::PersistPeer { peer }) = entry.op() { - self.pool.connect(peer.raft_id, peer.peer_address.clone()); *topology_changed = true; if peer.grade == Grade::Expelled && peer.raft_id == self.raft_id() { // cannot exit during a transaction @@ -661,9 +657,9 @@ impl NodeImpl { } /// Is called during a transaction - fn handle_messages(&self, messages: Vec<raft::Message>) { + fn handle_messages(&mut self, messages: Vec<raft::Message>) { for msg in messages { - if let Err(e) = self.pool.send(&msg) { + if let Err(e) = self.pool.send(msg) { tlog!(Error, "{e}"); } } -- GitLab