Skip to content
Snippets Groups Projects

refactor(network): PoolWorker now reads peer address from storage

Merged Georgy Moshkin requested to merge refactor/connection-pool into master
Files
2
+ 90
69
@@ -5,7 +5,6 @@ use ::tarantool::net_box::ConnOptions;
use ::tarantool::net_box::Options;
use ::tarantool::util::IntoClones;
use std::cell::Cell;
use std::cell::RefCell;
use std::collections::HashMap;
use std::rc::Rc;
use std::time::Duration;
@@ -14,7 +13,9 @@ use crate::mailbox::Mailbox;
use crate::tlog;
use crate::traft;
use crate::traft::error::PoolSendError;
use crate::traft::storage::peer_field::{self, PeerAddress};
use crate::traft::RaftId;
use crate::traft::{PeerStorage, Storage};
#[derive(Clone, Debug)]
struct WorkerOptions {
@@ -35,29 +36,29 @@ impl Default for WorkerOptions {
}
}
////////////////////////////////////////////////////////////////////////////////
// PoolWorker
////////////////////////////////////////////////////////////////////////////////
struct PoolWorker {
id: RaftId,
uri: Rc<RefCell<String>>,
inbox: Mailbox<traft::MessagePb>,
fiber: fiber::LuaUnitJoinHandle<'static>,
stop_flag: Rc<Cell<Option<()>>>,
}
impl PoolWorker {
pub fn run(id: RaftId, uri: String, opts: WorkerOptions) -> PoolWorker {
let uri = Rc::new(RefCell::new(uri));
pub fn run(id: RaftId, storage: PeerStorage, opts: WorkerOptions) -> PoolWorker {
let inbox = Mailbox::new();
let stop_flag: Rc<Cell<Option<()>>> = Default::default();
let fiber = fiber::defer_proc({
let uri = uri.clone();
let inbox = inbox.clone();
let stop_flag = stop_flag.clone();
move || Self::worker_loop(uri, inbox, stop_flag, &opts)
move || Self::worker_loop(id, storage, inbox, stop_flag, &opts)
});
Self {
id,
uri,
fiber,
inbox,
stop_flag,
@@ -65,13 +66,14 @@ impl PoolWorker {
}
fn worker_loop(
uri: Rc<RefCell<String>>,
raft_id: RaftId,
storage: PeerStorage,
inbox: Mailbox<traft::MessagePb>,
stop_flag: Rc<Cell<Option<()>>>,
opts: &WorkerOptions,
) {
struct ConnCache {
uri: String,
address: String,
conn: Conn,
}
let cache: Cell<Option<ConnCache>> = Cell::default();
@@ -89,30 +91,33 @@ impl PoolWorker {
continue;
}
let mut conn_cache = match cache.take() {
Some(v) if uri.borrow().eq(&v.uri) => Some(v),
_ => None,
};
let ConnCache { address, conn } = ::tarantool::unwrap_or!(cache.take(), {
let address = storage.field_by_raft_id::<PeerAddress>(raft_id);
let address = crate::unwrap_ok_or!(address,
Err(e) => {
tlog!(Warning, "failed getting peer address: {e}";
"raft_id" => raft_id,
);
continue
}
);
if conn_cache.is_none() {
let uri = uri.borrow();
let conn_opts = ConnOptions {
connect_timeout: opts.connect_timeout,
..Default::default()
};
conn_cache = Conn::new(uri.clone(), conn_opts, None)
.map(|conn| ConnCache {
uri: uri.clone(),
conn,
})
.map_err(|e| tlog!(Debug, "{uri}: {e}"))
.ok();
}
let (uri, conn) = match &conn_cache {
Some(v) => (&v.uri, &v.conn),
None => continue,
};
let conn = Conn::new(&address, conn_opts, None);
let conn = crate::unwrap_ok_or!(conn,
Err(e) => {
tlog!(Debug, "failed establishing connection to peer: {e}";
"peer" => address,
"raft_id" => raft_id,
);
continue
}
);
ConnCache { address, conn }
});
let call_opts = Options {
timeout: Some(opts.call_timeout),
@@ -121,8 +126,10 @@ impl PoolWorker {
// implicit yield
match conn.call(opts.handler_name, &messages, &call_opts) {
Ok(_) => cache.set(conn_cache),
Err(e) => tlog!(Debug, "Interact with {uri} -> {e}"),
Ok(_) => cache.set(Some(ConnCache { address, conn })),
Err(e) => tlog!(Debug, "failed sending messages to peer: {e}";
"peer" => address,
),
}
if stop_flag.take().is_some() {
@@ -131,10 +138,6 @@ impl PoolWorker {
}
}
pub fn set_uri(&self, uri: String) {
self.uri.replace(uri);
}
pub fn send(&self, msg: raft::Message) -> Result<(), PoolSendError> {
self.inbox.send(msg.into());
Ok(())
@@ -148,21 +151,22 @@ impl PoolWorker {
impl std::fmt::Debug for PoolWorker {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("PoolWorker")
.field("id", &self.id)
.field("uri", &self.uri)
.finish()
f.debug_struct("PoolWorker").field("id", &self.id).finish()
}
}
#[derive(Default)]
////////////////////////////////////////////////////////////////////////////////
// ConnectionPoolBuilder
////////////////////////////////////////////////////////////////////////////////
pub struct ConnectionPoolBuilder {
worker_options: WorkerOptions,
storage: PeerStorage,
}
macro_rules! builder_option {
($opt:ident, $t:ty) => {
pub fn $opt(&'_ mut self, val: $t) -> &'_ mut Self {
pub fn $opt(mut self, val: $t) -> Self {
self.worker_options.$opt = val;
self
}
@@ -175,34 +179,32 @@ impl ConnectionPoolBuilder {
builder_option!(connect_timeout, Duration);
builder_option!(inactivity_timeout, Duration);
pub fn build(&self) -> ConnectionPool {
pub fn build(self) -> ConnectionPool {
ConnectionPool {
worker_options: self.worker_options.clone(),
worker_options: self.worker_options,
workers: HashMap::new(),
storage: self.storage,
}
}
}
////////////////////////////////////////////////////////////////////////////////
// ConnectionPool
////////////////////////////////////////////////////////////////////////////////
#[derive(Debug)]
pub struct ConnectionPool {
worker_options: WorkerOptions,
workers: HashMap<RaftId, PoolWorker>,
storage: PeerStorage,
}
impl ConnectionPool {
pub fn builder() -> ConnectionPoolBuilder {
ConnectionPoolBuilder::default()
}
/// Create a worker for communicating with another node.
/// Connection is established lazily at the first request.
/// It's also re-established automatically upon any error.
pub fn connect(&mut self, id: RaftId, uri: String) {
let worker_options = self.worker_options.clone();
self.workers
.entry(id)
.and_modify(|wrk| wrk.set_uri(uri.clone()))
.or_insert_with(|| PoolWorker::run(id, uri.clone(), worker_options));
pub fn builder(storage: PeerStorage) -> ConnectionPoolBuilder {
ConnectionPoolBuilder {
storage,
worker_options: Default::default(),
}
}
#[allow(dead_code)]
@@ -216,15 +218,21 @@ impl ConnectionPool {
/// of the raft node to re-send it later.
///
/// This function never yields.
pub fn send(&self, msg: &raft::Message) -> Result<(), PoolSendError> {
// tlog!(Debug, "Sending {msg:?}");
let wrk = self
.workers
.get(&msg.to)
.ok_or(PoolSendError::UnknownRecipient(msg.to))?;
// let msg = Message::try_from(msg.clone())?;
wrk.send(msg.clone())
pub fn send(&mut self, msg: raft::Message) -> Result<(), PoolSendError> {
let raft_id = msg.to;
if !self.workers.contains_key(&msg.to) {
let storage = self.storage.clone();
let worker_options = self.worker_options.clone();
// check that peer exists
storage
.field_by_raft_id::<peer_field::RaftId>(raft_id)
.map_err(|_| PoolSendError::UnknownRecipient(raft_id))?;
let wrk = PoolWorker::run(raft_id, storage, worker_options);
self.workers.insert(raft_id, wrk);
}
let wrk = self.workers.get(&raft_id).expect("just inserted it");
wrk.send(msg)
}
}
@@ -236,6 +244,10 @@ impl Drop for ConnectionPool {
}
}
////////////////////////////////////////////////////////////////////////////////
// tests
////////////////////////////////////////////////////////////////////////////////
// TODO test connecting twice (reconnecting)
// thread 'main' panicked at 'UnitJoinHandle dropped before being joined',
// picodata::traft::network::ConnectionPool::connect
@@ -264,15 +276,24 @@ inventory::submit!(crate::InnerTest {
}),
);
let storage = Storage::peers_access();
// Connect to the current Tarantool instance
let mut pool = ConnectionPool::builder()
let mut pool = ConnectionPool::builder(storage.clone())
.handler_name("test_interact")
.call_timeout(Duration::from_millis(50))
.connect_timeout(Duration::from_millis(50))
.build();
let listen: String = l.eval("return box.info.listen").unwrap();
storage
.persist_peer(&traft::Peer {
raft_id: 1337,
peer_address: listen.clone(),
..Default::default()
})
.unwrap();
tlog!(Info, "TEST: connecting {listen}");
pool.connect(1337, listen);
// pool.connect(1337, listen);
let heartbeat_to_from = |to: RaftId, from: RaftId| raft::Message {
msg_type: raft::MessageType::MsgHeartbeat,
@@ -283,7 +304,7 @@ inventory::submit!(crate::InnerTest {
// Send a request
// TODO: assert there's no yield
pool.send(&heartbeat_to_from(1337, 1)).unwrap();
pool.send(heartbeat_to_from(1337, 1)).unwrap();
// Assert it arrives
// Assert equality
@@ -294,7 +315,7 @@ inventory::submit!(crate::InnerTest {
// Assert unknown recepient error
assert!(matches!(
pool.send(&heartbeat_to_from(9999, 3)).unwrap_err(),
pool.send(heartbeat_to_from(9999, 3)).unwrap_err(),
PoolSendError::UnknownRecipient(9999)
));
@@ -314,7 +335,7 @@ inventory::submit!(crate::InnerTest {
// Send the second request
// TODO: assert there's no yield
pool.send(&heartbeat_to_from(1337, 4)).unwrap();
pool.send(heartbeat_to_from(1337, 4)).unwrap();
// Assert it arrives too
// Assert equality
Loading