From 1de45a056b86e4f6ad331608bd74c8addda58c0f Mon Sep 17 00:00:00 2001 From: Georgy Moshkin <gmoshkin@picodata.io> Date: Thu, 29 Sep 2022 13:53:21 +0300 Subject: [PATCH] refactor(storage): use PeerStorage in node.rs --- src/main.rs | 22 +++++++++++----------- src/traft/node.rs | 44 ++++++++++++++++++++++++++++---------------- 2 files changed, 39 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index 2609fe3aec..951deff305 100644 --- a/src/main.rs +++ b/src/main.rs @@ -506,16 +506,16 @@ fn main_run(args: args::Run) -> ! { } } -fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> RaftSpaceAccess { +fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> (RaftSpaceAccess, PeerStorage) { std::fs::create_dir_all(&args.data_dir).unwrap(); tarantool::set_cfg(cfg); let peer_storage = PeerStorage::new().expect("RaftSpaceAccess initialization failed"); - traft::Storage::init_schema(peer_storage); + traft::Storage::init_schema(peer_storage.clone()); let storage = RaftSpaceAccess::new().expect("RaftSpaceAccess initialization failed"); init_handlers(); traft::event::init(); - storage + (storage, peer_storage) } fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { @@ -535,7 +535,7 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { ..Default::default() }; - let storage = init_common(args, &cfg); + let (storage, peer_storage) = init_common(args, &cfg); discovery::init_global(&args.peers); cfg.listen = Some(args.listen.clone()); @@ -543,7 +543,7 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { // TODO assert traft::Storage::instance_id == (null || args.instance_id) if storage.raft_id().unwrap().is_some() { - return postjoin(args, storage); + return postjoin(args, storage, peer_storage); } let role = discovery::wait_global(); @@ -597,7 +597,7 @@ fn start_boot(args: &args::Run) { ..Default::default() }; - let mut storage = init_common(args, &cfg); + let (mut storage, peer_storage) = init_common(args, &cfg); start_transaction(|| -> Result<(), TntError> { let cs = raft::ConfState { @@ -677,7 +677,7 @@ fn start_boot(args: &args::Run) { }) .unwrap(); - postjoin(args, storage) + postjoin(args, storage, peer_storage) } fn start_join(args: &args::Run, leader_address: String) { @@ -735,7 +735,7 @@ fn start_join(args: &args::Run, leader_address: String) { ..Default::default() }; - let mut storage = init_common(args, &cfg); + let (mut storage, peer_storage) = init_common(args, &cfg); let raft_id = resp.peer.raft_id; start_transaction(|| -> Result<(), TntError> { @@ -750,10 +750,10 @@ fn start_join(args: &args::Run, leader_address: String) { }) .unwrap(); - postjoin(args, storage) + postjoin(args, storage, peer_storage) } -fn postjoin(args: &args::Run, storage: RaftSpaceAccess) { +fn postjoin(args: &args::Run, storage: RaftSpaceAccess, peer_storage: PeerStorage) { tlog!(Info, ">>>>> postjoin()"); let mut box_cfg = tarantool::cfg().unwrap(); @@ -763,7 +763,7 @@ fn postjoin(args: &args::Run, storage: RaftSpaceAccess) { box_cfg.replication_connect_quorum = 0; tarantool::set_cfg(&box_cfg); - let node = traft::node::Node::new(storage.clone()); + let node = traft::node::Node::new(storage.clone(), peer_storage); let node = node.expect("failed initializing raft node"); traft::node::set_global(node); let node = traft::node::global().unwrap(); diff --git a/src/traft/node.rs b/src/traft/node.rs index 237a99ecfd..5f706a66a6 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -55,7 +55,7 @@ use crate::traft::TopologyRequest; use crate::traft::{ ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, SyncRaftRequest, UpdatePeerRequest, }; -use crate::traft::{RaftSpaceAccess, Storage}; +use crate::traft::{PeerStorage, RaftSpaceAccess, Storage}; use super::Grade; use super::OpResult; @@ -87,6 +87,7 @@ pub struct Node { node_impl: Rc<Mutex<NodeImpl>>, pub(super) storage: RaftSpaceAccess, + pub(super) peer_storage: PeerStorage, main_loop: MainLoop, _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, @@ -103,8 +104,8 @@ impl std::fmt::Debug for Node { impl Node { /// Initialize the raft node. /// **This function yields** - pub fn new(storage: RaftSpaceAccess) -> Result<Self, RaftError> { - let node_impl = NodeImpl::new(storage.clone())?; + pub fn new(storage: RaftSpaceAccess, peer_storage: PeerStorage) -> Result<Self, RaftError> { + let node_impl = NodeImpl::new(storage.clone(), peer_storage.clone())?; let raft_id = node_impl.raft_id(); let status = Rc::new(RefCell::new(Status { @@ -119,7 +120,8 @@ impl Node { let conf_change_loop_fn = { let status = status.clone(); let storage = storage.clone(); - move || raft_conf_change_loop(status, storage) + let peer_storage = peer_storage.clone(); + move || raft_conf_change_loop(status, storage, peer_storage) }; let node = Node { @@ -132,6 +134,7 @@ impl Node { .unwrap(), node_impl, storage, + peer_storage, status, }; @@ -270,6 +273,7 @@ struct NodeImpl { topology_cache: KVCell<RaftTerm, Topology>, joint_state_latch: KVCell<RaftIndex, Notify>, storage: RaftSpaceAccess, + peer_storage: PeerStorage, pool: ConnectionPool, lc: LogicalClock, } @@ -277,6 +281,7 @@ struct NodeImpl { impl NodeImpl { fn new( mut storage: RaftSpaceAccess, + peer_storage: PeerStorage, // TODO: provide clusterwide space access ) -> Result<Self, RaftError> { let box_err = |e| StorageError::Other(Box::new(e)); @@ -289,8 +294,7 @@ impl NodeImpl { LogicalClock::new(raft_id, gen) }; - let peer_storage = Storage::peers_access().clone(); - let pool = ConnectionPool::builder(peer_storage) + let pool = ConnectionPool::builder(peer_storage.clone()) .handler_name(stringify_cfunc!(raft_interact)) .call_timeout(MainLoop::TICK * 4) .connect_timeout(MainLoop::TICK * 4) @@ -312,6 +316,7 @@ impl NodeImpl { topology_cache: KVCell::new(), joint_state_latch: KVCell::new(), storage, + peer_storage, pool, lc, }) @@ -341,8 +346,8 @@ impl NodeImpl { let topology: Topology = unwrap_some_or! { self.topology_cache.take_or_drop(¤t_term), { - let peers = Storage::peers().map_err(box_err)?; - let replication_factor = Storage::replication_factor().map_err(box_err)?.unwrap(); + let peers = self.peer_storage.all_peers().map_err(box_err)?; + let replication_factor = Storage::replication_factor()?.unwrap(); Topology::from_peers(peers).with_replication_factor(replication_factor) } }; @@ -983,14 +988,18 @@ fn raft_conf_change(storage: &RaftSpaceAccess, peers: &[Peer]) -> Option<raft::C Some(conf_change) } -fn raft_conf_change_loop(status: Rc<RefCell<Status>>, storage: RaftSpaceAccess) { +fn raft_conf_change_loop( + status: Rc<RefCell<Status>>, + storage: RaftSpaceAccess, + peer_storage: PeerStorage, +) { loop { if status.borrow().raft_state != "Leader" { event::wait(Event::StatusChanged).expect("Events system must be initialized"); continue; } - let peers = Storage::peers().unwrap(); + let peers = peer_storage.all_peers().unwrap(); let term = storage.term().unwrap().unwrap_or(0); let node = global().expect("must be initialized"); @@ -1072,16 +1081,19 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error } let peer = node.handle_topology_request_and_wait(req.into())?; - let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?; + let box_replication = node + .peer_storage + .replicaset_peer_addresses(&peer.replicaset_id, Some(peer.commit_index))?; // A joined peer needs to communicate with other nodes. // Provide it the list of raft voters in response. let mut raft_group = vec![]; for raft_id in node.storage.voters()?.unwrap_or_default().into_iter() { - if let Some(peer) = Storage::peer_by_raft_id(raft_id)? { - raft_group.push(peer); - } else { - crate::warn_or_panic!("peer missing in storage, raft_id: {}", raft_id); + match node.peer_storage.get(&raft_id) { + Err(e) => { + crate::warn_or_panic!("failed reading peer with id `{}`: {}", raft_id, e); + } + Ok(peer) => raft_group.push(peer), } } @@ -1129,7 +1141,7 @@ fn raft_expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std: fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { let node = global()?; let leader_id = node.status().leader_id.ok_or("leader_id not found")?; - let leader = Storage::peer_by_raft_id(leader_id).unwrap().unwrap(); + let leader = node.peer_storage.get(&leader_id).unwrap(); let leader_address = leader.peer_address; let fn_name = stringify_cfunc!(traft::node::raft_expel_on_leader); -- GitLab