diff --git a/src/main.rs b/src/main.rs index e1538f9067343aa250c2a59e47876931f6d88e78..477f98e9f269f51ae71d0cb219cd9f38ebad46f6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -511,12 +511,9 @@ fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> Storage { std::fs::create_dir_all(&args.data_dir).unwrap(); tarantool::set_cfg(cfg); - let storage = Storage::new().expect("Storage initialization failed"); - // TODO: don't use global Storage anywhere - traft::storage::Storage::init_schema(storage.peers.clone()); init_handlers(); traft::event::init(); - storage + Storage::new().expect("Storage initialization failed") } fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { @@ -740,9 +737,9 @@ fn start_join(args: &args::Run, leader_address: String) { let raft_id = resp.peer.raft_id; start_transaction(|| -> Result<(), TntError> { - traft::StorageOld::persist_peer(&resp.peer).unwrap(); + storage.peers.persist_peer(&resp.peer).unwrap(); for peer in resp.raft_group { - traft::StorageOld::persist_peer(&peer).unwrap(); + storage.peers.persist_peer(&peer).unwrap(); } storage.raft.persist_raft_id(raft_id).unwrap(); storage @@ -767,14 +764,13 @@ fn postjoin(args: &args::Run, storage: Storage) { box_cfg.replication_connect_quorum = 0; tarantool::set_cfg(&box_cfg); - let raft_storage = storage.raft.clone(); - let node = traft::node::Node::new(storage); + let node = traft::node::Node::new(storage.clone()); let node = node.expect("failed initializing raft node"); traft::node::set_global(node); let node = traft::node::global().unwrap(); let raft_id = node.raft_id(); - let cs = raft_storage.conf_state().unwrap(); + let cs = storage.raft.conf_state().unwrap(); if cs.voters == [raft_id] { tlog!( Info, @@ -819,10 +815,9 @@ fn postjoin(args: &args::Run, storage: Storage) { } loop { - let peer = traft::StorageOld::peer_by_raft_id(raft_id) - .unwrap() + let peer = storage.peers.get(&raft_id) .expect("peer must be persisted at the time of postjoin"); - let cluster_id = raft_storage + let cluster_id = storage.raft .cluster_id() .unwrap() .expect("cluster_id must be persisted at the time of postjoin"); @@ -833,9 +828,7 @@ fn postjoin(args: &args::Run, storage: Storage) { .with_failure_domain(args.failure_domain()); let leader_id = node.status().leader_id.expect("leader_id deinitialized"); - let leader = traft::StorageOld::peer_by_raft_id(leader_id) - .unwrap() - .unwrap(); + let leader = storage.peers.get(&leader_id).unwrap(); // It's necessary to call `raft_update_peer` remotely on a // leader over net_box. It always fails otherwise. Only the @@ -1035,8 +1028,6 @@ fn test_one(t: &InnerTest) { }; tarantool::set_cfg(&cfg); - // TODO: don't use global Storage in tests - traft::StorageOld::init_schema(traft::storage::Peers::new().unwrap()); tarantool::exec( r#" box.schema.user.grant('guest', 'super', nil, nil, {if_not_exists = true}) diff --git a/src/traft/error.rs b/src/traft/error.rs index 228127360930b88c016f04bb3be2e139dd2516b4..7a448d056666722c30e7ac5252ddf64962f2fac2 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -35,7 +35,7 @@ pub enum Error { Tarantool(#[from] ::tarantool::error::Error), #[error("peer with id {0} not found")] NoPeerWithRaftId(RaftId), - #[error("peer with id {0:?} not found")] + #[error("peer with id \"{0}\" not found")] NoPeerWithInstanceId(InstanceId), #[error("other error: {0}")] Other(Box<dyn std::error::Error>), diff --git a/src/traft/failover.rs b/src/traft/failover.rs index 013fcb3c4124732b07e5d93b6cbd44218b5c5503..5872a6722db679a538f169acfb417d10914b7990 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -9,7 +9,6 @@ use crate::{stringify_cfunc, tarantool, tlog}; use crate::traft::error::Error; use crate::traft::event; use crate::traft::node; -use crate::traft::StorageOld; use crate::traft::TargetGrade; use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; @@ -41,7 +40,7 @@ pub fn on_shutdown() { return; } - let peer = StorageOld::peer_by_raft_id(raft_id).unwrap().unwrap(); + let peer = node.storage.peers.get(&raft_id).unwrap(); let cluster_id = node .storage .raft diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 68f7ae19470fa55125340fc7f9808d02706ea9ab..d492e69e1acdefa5bcbe65cccf9466d3c5eb8efa 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -225,7 +225,7 @@ impl std::fmt::Display for Op { } impl Op { - pub fn on_commit(&self) -> Box<dyn Any> { + pub fn on_commit(&self, peers: &storage::Peers) -> Box<dyn Any> { match self { Self::Nop => Box::new(()), Self::Info { msg } => { @@ -235,7 +235,7 @@ impl Op { Self::EvalLua(op) => Box::new(op.result()), Self::ReturnOne(op) => Box::new(op.result()), Self::PersistPeer { peer } => { - StorageOld::persist_peer(peer).unwrap(); + peers.persist_peer(peer).unwrap(); Box::new(peer.clone()) } Self::Dml(op) => Box::new(op.result()), diff --git a/src/traft/node.rs b/src/traft/node.rs index cd2e62401b421fc42708e2ddca084b00bfa655ce..c91f9afedd1d8d1963acf1a609fa633e72cf3bad 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -566,7 +566,7 @@ impl NodeImpl { expelled: &mut bool, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); - let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(); + let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(&self.storage.peers); if let Some(lc) = entry.lc() { if let Some(notify) = self.notifications.remove(lc) { diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 0df58bd765ccf567e226e8978e5a3711d896d9fb..58ba0d11bfc1daaa85352126c5599b43d83cf49d 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -33,9 +33,6 @@ define_str_enum! { #[error("unknown cluster space {0}")] pub struct UnknownClusterSpace(pub String); -// TODO(gmoshkin): remove this -const RAFT_GROUP: &str = ClusterSpace::Group.as_str(); - //////////////////////////////////////////////////////////////////////////////// // StateKey //////////////////////////////////////////////////////////////////////////////// @@ -79,65 +76,13 @@ fn box_err(e: impl std::error::Error + Sync + Send + 'static) -> StorageError { pub struct Storage; -// TODO: this should be a field in `Storage`. This is static for now, because -// the refactoring will block development. -static mut PEERS_ACCESS: Option<Peers> = None; - impl Storage { - pub fn init_schema(peers: Peers) { - if unsafe { PEERS_ACCESS.is_some() } { - crate::warn_or_panic!("schema reinitialized"); - } - - unsafe { PEERS_ACCESS = Some(peers) }; - } - - pub fn peers_access() -> &'static Peers { - unsafe { PEERS_ACCESS.as_ref().unwrap() } - } - fn space(name: impl AsRef<str> + Into<String>) -> Result<Space, StorageError> { Space::find(name.as_ref()) .ok_or_else(|| Error::NoSuchSpace(name.into())) .map_err(box_err) } - pub fn peer_by_raft_id(raft_id: RaftId) -> Result<Option<traft::Peer>, StorageError> { - Self::peers_access() - .peer_by_raft_id(raft_id) - .map_err(box_err) - } - - pub fn peer_by_instance_id(instance_id: &str) -> Result<Option<traft::Peer>, StorageError> { - Self::peers_access() - .peer_by_instance_id(instance_id) - .map_err(box_err) - } - - pub fn peers() -> Result<Vec<traft::Peer>, StorageError> { - Self::peers_access().all_peers().map_err(box_err) - } - - pub fn box_replication( - replicaset_id: &str, - max_index: Option<RaftIndex>, - ) -> Result<Vec<String>, StorageError> { - Self::peers_access() - .replicaset_peer_addresses(replicaset_id, max_index) - .map_err(box_err) - } - - pub fn persist_peer(peer: &traft::Peer) -> Result<(), StorageError> { - Self::peers_access().persist_peer(peer).map_err(box_err) - } - - #[allow(dead_code)] - pub fn delete_peer(instance_id: &str) -> Result<(), StorageError> { - Self::peers_access() - .delete_peer(instance_id) - .map_err(box_err) - } - pub fn insert(space: ClusterSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> { Storage::space(space.as_str())? .insert(tuple) @@ -388,20 +333,6 @@ impl Peers { Ok(res) } -<<<<<<< HEAD -======= - /// Return an iterator over all peers. Items of the iterator are - /// specified by `F` (see `PeerFieldDef` & `peer_field` module). - #[inline(always)] - pub fn peers_fields<F>(&self) -> Result<PeersFields<F>, TraftError> - where - F: PeerFieldDef, - { - let iter = self.space().select(IteratorType::All, &())?; - Ok(PeersFields::new(iter)) - } - ->>>>>>> refactor(storage): use UnsafeCell for space_peers in Peers #[inline] pub fn peer_by_instance_id(&self, instance_id: &str) -> tarantool::Result<Option<traft::Peer>> { let tuple = self.index_instance_id.get(&(instance_id,))?; @@ -652,9 +583,10 @@ macro_rules! assert_err { inventory::submit!(crate::InnerTest { name: "test_storage_peers", body: || { - use traft::{CurrentGrade, TargetGrade}; + use traft::{CurrentGrade, TargetGrade, InstanceId}; - let mut raft_group = Storage::space(RAFT_GROUP).unwrap(); + let storage_peers = Peers::new().unwrap(); + let mut raft_group = storage_peers.space().clone(); let faildom = crate::traft::FailureDomain::from([("a", "b")]); @@ -671,22 +603,21 @@ inventory::submit!(crate::InnerTest { raft_group.put(&peer).unwrap(); } - let peers = Storage::peers().unwrap(); + let peers = storage_peers.all_peers().unwrap(); assert_eq!( peers.iter().map(|p| &p.instance_id).collect::<Vec<_>>(), vec!["i1", "i2", "i3", "i4", "i5"] ); assert_err!( - Storage::persist_peer(&traft::Peer { + storage_peers.persist_peer(&traft::Peer { raft_id: 1, instance_id: "i99".into(), ..Default::default() }), format!( concat!( - "unknown error", - " Tarantool error:", + "Tarantool error:", " TupleFound: Duplicate key exists", " in unique index \"raft_id\"", " in space \"raft_group\"", @@ -711,21 +642,21 @@ inventory::submit!(crate::InnerTest { ..Default::default() }; - Storage::persist_peer(&peer(10, "addr:collision")).unwrap(); - Storage::persist_peer(&peer(11, "addr:collision")).unwrap(); + storage_peers.persist_peer(&peer(10, "addr:collision")).unwrap(); + storage_peers.persist_peer(&peer(11, "addr:collision")).unwrap(); } - let peer_by_raft_id = |id: RaftId| Storage::peer_by_raft_id(id).unwrap().unwrap(); + let peer_by_raft_id = |id: RaftId| storage_peers.get(&id).unwrap(); { assert_eq!(peer_by_raft_id(1).instance_id, "i1"); assert_eq!(peer_by_raft_id(2).instance_id, "i2"); assert_eq!(peer_by_raft_id(3).instance_id, "i3"); assert_eq!(peer_by_raft_id(4).instance_id, "i4"); assert_eq!(peer_by_raft_id(5).instance_id, "i5"); - assert_eq!(Storage::peer_by_raft_id(6), Ok(None)); + assert_err!(storage_peers.get(&6), "peer with id 6 not found"); } - let peer_by_instance_id = |iid| Storage::peer_by_instance_id(iid).unwrap().unwrap(); + let peer_by_instance_id = |iid: &str| storage_peers.get(&InstanceId::from(iid)).unwrap(); { assert_eq!(peer_by_instance_id("i1").peer_address, "addr:1"); assert_eq!(peer_by_instance_id("i2").peer_address, "addr:2"); @@ -736,11 +667,14 @@ inventory::submit!(crate::InnerTest { peer_by_instance_id("i10").peer_address, peer_by_instance_id("i11").peer_address ); - assert_eq!(Storage::peer_by_instance_id("i6"), Ok(None)); + assert_err!( + storage_peers.get(&InstanceId::from("i6")), + "peer with id \"i6\" not found" + ); } let box_replication = |replicaset_id: &str, max_index: Option<RaftIndex>| { - Storage::box_replication(replicaset_id, max_index).unwrap() + storage_peers.replicaset_peer_addresses(replicaset_id, max_index).unwrap() }; { @@ -763,10 +697,9 @@ inventory::submit!(crate::InnerTest { raft_group.index("raft_id").unwrap().drop().unwrap(); assert_err!( - Storage::peer_by_raft_id(1), + storage_peers.get(&1), concat!( - "unknown error", - " Tarantool error: NoSuchIndexID: No index #1 is defined", + "Tarantool error: NoSuchIndexID: No index #1 is defined", " in space 'raft_group'", ) ); @@ -774,10 +707,9 @@ inventory::submit!(crate::InnerTest { raft_group.index("replicaset_id").unwrap().drop().unwrap(); assert_err!( - Storage::box_replication("", None), + storage_peers.replicaset_peer_addresses("", None), concat!( - "unknown error", - " Tarantool error: NoSuchIndexID: No index #2 is defined", + "Tarantool error: NoSuchIndexID: No index #2 is defined", " in space 'raft_group'", ) ); @@ -785,10 +717,9 @@ inventory::submit!(crate::InnerTest { raft_group.primary_key().drop().unwrap(); assert_err!( - Storage::peer_by_instance_id("i1"), + storage_peers.get(&InstanceId::from("i1")), concat!( - "unknown error", - " Tarantool error: NoSuchIndexID: No index #0 is defined", + "Tarantool error: NoSuchIndexID: No index #0 is defined", " in space 'raft_group'", ) ); @@ -796,9 +727,9 @@ inventory::submit!(crate::InnerTest { raft_group.drop().unwrap(); assert_err!( - Storage::peers(), + storage_peers.all_peers(), format!( - "unknown error Tarantool error: NoSuchSpace: Space '{}' does not exist", + "Tarantool error: NoSuchSpace: Space '{}' does not exist", raft_group.id(), ) );