diff --git a/src/main.rs b/src/main.rs index e2e497ee4456adc63cd04896e8739795aa6d6da5..48881283eb5ace808147124e4eb79bb2b5c514d2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -13,6 +13,7 @@ use std::convert::TryFrom; use std::time::{Duration, Instant}; use traft::storage::{RaftSpace, RaftStateKey}; use traft::ExpelRequest; +use traft::RaftSpaceAccess; use clap::StructOpt as _; use protobuf::Message as _; @@ -189,12 +190,16 @@ fn picolib_setup(args: &args::Run) { luamod.set( "raft_log", tlua::function1( - |opts: Option<RaftLogOpts>| -> Result<Option<String>, ::raft::StorageError> { + |opts: Option<RaftLogOpts>| -> Result<Option<String>, Error> { let header = ["index", "term", "lc", "contents"]; let [index, term, lc, contents] = header; let mut rows = vec![]; let mut col_widths = header.map(|h| h.len()); - for entry in traft::Storage::all_traft_entries()? { + let node = traft::node::global()?; + let entries = node + .all_traft_entries() + .map_err(|e| Error::Other(Box::new(e)))?; + for entry in entries { let row = [ entry.index.to_string(), entry.term.to_string(), @@ -493,13 +498,15 @@ fn main_run(args: args::Run) -> ! { } } -fn init_common(args: &args::Run, cfg: &tarantool::Cfg) { +fn init_common(args: &args::Run, cfg: &tarantool::Cfg) -> RaftSpaceAccess { std::fs::create_dir_all(&args.data_dir).unwrap(); tarantool::set_cfg(cfg); traft::Storage::init_schema(); + let storage = RaftSpaceAccess::new().expect("RaftSpaceAccess initialization failed"); init_handlers(); traft::event::init(); + storage } fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { @@ -519,15 +526,15 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { ..Default::default() }; - init_common(args, &cfg); + let storage = init_common(args, &cfg); discovery::init_global(&args.peers); cfg.listen = Some(args.listen.clone()); tarantool::set_cfg(&cfg); // TODO assert traft::Storage::instance_id == (null || args.instance_id) - if traft::Storage::raft_id().unwrap().is_some() { - return postjoin(args); + if storage.raft_id().unwrap().is_some() { + return postjoin(args, storage); } let role = discovery::wait_global(); @@ -581,7 +588,7 @@ fn start_boot(args: &args::Run) { ..Default::default() }; - init_common(args, &cfg); + let mut storage = init_common(args, &cfg); start_transaction(|| -> Result<(), TntError> { let cs = raft::ConfState { @@ -650,18 +657,21 @@ fn start_boot(args: &args::Run) { raft::Entry::try_from(e).unwrap() }); - traft::Storage::persist_conf_state(&cs).unwrap(); - traft::Storage::persist_entries(&init_entries).unwrap(); - traft::Storage::persist_commit(init_entries.len() as _).unwrap(); - traft::Storage::persist_term(1).unwrap(); - traft::Storage::persist_raft_id(raft_id).unwrap(); - traft::Storage::persist_instance_id(&instance_id).unwrap(); - traft::Storage::persist_cluster_id(&args.cluster_id).unwrap(); + storage.persist_conf_state(&cs).unwrap(); + storage.persist_entries(&init_entries).unwrap(); + storage.persist_raft_id(raft_id).unwrap(); + storage.persist_instance_id(&instance_id).unwrap(); + storage.persist_cluster_id(&args.cluster_id).unwrap(); + + let mut hs = raft::HardState::default(); + hs.set_commit(init_entries.len() as _); + hs.set_term(1); + storage.persist_hard_state(&hs).unwrap(); Ok(()) }) .unwrap(); - postjoin(args) + postjoin(args, storage) } fn start_join(args: &args::Run, leader_address: String) { @@ -719,7 +729,7 @@ fn start_join(args: &args::Run, leader_address: String) { ..Default::default() }; - init_common(args, &cfg); + let mut storage = init_common(args, &cfg); let raft_id = resp.peer.raft_id; start_transaction(|| -> Result<(), TntError> { @@ -727,17 +737,17 @@ fn start_join(args: &args::Run, leader_address: String) { for peer in resp.raft_group { traft::Storage::persist_peer(&peer).unwrap(); } - traft::Storage::persist_raft_id(raft_id).unwrap(); - traft::Storage::persist_instance_id(&resp.peer.instance_id).unwrap(); - traft::Storage::persist_cluster_id(&args.cluster_id).unwrap(); + storage.persist_raft_id(raft_id).unwrap(); + storage.persist_instance_id(&resp.peer.instance_id).unwrap(); + storage.persist_cluster_id(&args.cluster_id).unwrap(); Ok(()) }) .unwrap(); - postjoin(args) + postjoin(args, storage) } -fn postjoin(args: &args::Run) { +fn postjoin(args: &args::Run, storage: RaftSpaceAccess) { tlog!(Info, ">>>>> postjoin()"); let mut box_cfg = tarantool::cfg().unwrap(); @@ -747,8 +757,8 @@ fn postjoin(args: &args::Run) { box_cfg.replication_connect_quorum = 0; tarantool::set_cfg(&box_cfg); - let raft_id = traft::Storage::raft_id().unwrap().unwrap(); - let applied = traft::Storage::applied().unwrap().unwrap_or(0); + let raft_id = storage.raft_id().unwrap().unwrap(); + let applied = storage.applied().unwrap().unwrap_or(0); let raft_cfg = raft::Config { id: raft_id, applied, @@ -756,12 +766,12 @@ fn postjoin(args: &args::Run) { ..Default::default() }; - let node = traft::node::Node::new(&raft_cfg); + let node = traft::node::Node::new(&raft_cfg, storage.clone()); let node = node.expect("failed initializing raft node"); traft::node::set_global(node); let node = traft::node::global().unwrap(); - let cs = traft::Storage::conf_state().unwrap(); + let cs = storage.conf_state().unwrap(); if cs.voters == [raft_cfg.id] { tlog!( Info, @@ -813,7 +823,8 @@ fn postjoin(args: &args::Run) { let peer = traft::Storage::peer_by_raft_id(raft_id) .unwrap() .expect("peer must be persisted at the time of postjoin"); - let cluster_id = traft::Storage::cluster_id() + let cluster_id = storage + .cluster_id() .unwrap() .expect("cluster_id must be persisted at the time of postjoin"); diff --git a/src/traft/failover.rs b/src/traft/failover.rs index 1eb5aa65a54184bec059d91eec050e9493b7a088..f53f2ac2489ad8f8b19b58e1d1211be926bb9289 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -14,17 +14,34 @@ use crate::traft::Storage; use crate::traft::{UpdatePeerRequest, UpdatePeerResponse}; pub fn on_shutdown() { - let voters = Storage::voters().expect("failed reading voters"); - let active_learners = Storage::active_learners().expect("failed reading active learners"); - let raft_id = node::global().unwrap().status().id; - - if voters == [raft_id] && active_learners.is_empty() { + let node = node::global().unwrap(); + let voters = node + .storage + .voters() + .expect("failed reading voters") + .unwrap_or_default(); + let learners = node + .storage + .learners() + .expect("failed reading learners") + .unwrap_or_default(); + let have_active_learners = learners.into_iter().any(|raft_id| { + Storage::peer_by_raft_id(raft_id) + .expect("failed reading peer") + .map(|peer| peer.is_active()) + .unwrap_or(false) + }); + let raft_id = node.status().id; + + if voters == [raft_id] && !have_active_learners { tlog!(Warning, "the last active instance has shut down"); return; } let peer = Storage::peer_by_raft_id(raft_id).unwrap().unwrap(); - let cluster_id = Storage::cluster_id() + let cluster_id = node + .storage + .cluster_id() .unwrap() .expect("cluster_id must be present"); let req = UpdatePeerRequest::new(peer.instance_id, cluster_id).with_grade(Grade::Offline); @@ -79,7 +96,10 @@ fn raft_update_peer( ) -> Result<UpdatePeerResponse, Box<dyn std::error::Error>> { let node = node::global()?; - let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + let cluster_id = node + .storage + .cluster_id()? + .ok_or("cluster_id is not set yet")?; if req.cluster_id != cluster_id { return Err(Box::new(Error::ClusterIdMismatch { @@ -94,11 +114,19 @@ fn raft_update_peer( pub fn voters_needed(voters: usize, total: usize) -> i64 { let voters_expected = match total { + 0 => { + crate::warn_or_panic!("`voters_needed` was called with `total` = 0"); + 0 + } 1 => 1, 2 => 2, 3..=4 => 3, 5.. => 5, - _ => unreachable!(), + _ => unreachable!( + "just another thing rust is garbage at: + `5..` covers all the rest of the values, + but rust can't figure this out" + ), }; voters_expected - (voters as i64) } @@ -115,5 +143,7 @@ mod tests { assert_eq!(super::voters_needed(6, 4), -3); assert_eq!(super::voters_needed(1, 5), 4); assert_eq!(super::voters_needed(1, 999), 4); + assert_eq!(super::voters_needed(0, usize::MAX), 5); + assert_eq!(super::voters_needed(0, u64::MAX as _), 5); } } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 1371bd1cdfe01c98d9b8f67bf6d90a37af6a9648..831be6cb6d772b684b12120e01d2a5270eab391e 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -30,6 +30,7 @@ use protobuf::Message as _; pub use network::ConnectionPool; use storage::RaftSpace; pub use storage::Storage; +pub use storage2::RaftSpaceAccess; pub use topology::Topology; pub type RaftId = u64; diff --git a/src/traft/node.rs b/src/traft/node.rs index 04bf3dc94bb6e28e6fa21e97bf76950f9e12ca06..9ecd1386559ee598e6d904323ea8daf66437d363 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -44,17 +44,17 @@ use crate::traft::notify::Notify; use crate::traft::ConnectionPool; use crate::traft::LogicalClock; use crate::traft::Op; -use crate::traft::Storage; use crate::traft::Topology; use crate::traft::TopologyRequest; use crate::traft::{ ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, SyncRaftRequest, UpdatePeerRequest, }; +use crate::traft::{RaftSpaceAccess, Storage}; use super::Grade; use super::OpResult; -type RawNode = raft::RawNode<Storage>; +type RawNode = raft::RawNode<RaftSpaceAccess>; #[derive(Clone, Debug, tlua::Push, tlua::PushInto)] pub struct Status { @@ -73,6 +73,7 @@ pub struct Status { pub struct Node { raw_node: Rc<Mutex<RawNode>>, raft_id: RaftId, + pub(super) storage: RaftSpaceAccess, _main_loop: fiber::UnitJoinHandle<'static>, _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, @@ -95,8 +96,8 @@ impl Node { /// Initialize the raft node. /// **This function yields** - pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { - let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; + pub fn new(cfg: &raft::Config, mut storage: RaftSpaceAccess) -> Result<Self, RaftError> { + let raw_node = RawNode::new(cfg, storage.clone(), &tlog::root())?; let raw_node = Rc::new(Mutex::new(raw_node)); let status = Rc::new(RefCell::new(Status { id: cfg.id, @@ -110,14 +111,16 @@ impl Node { let main_loop_fn = { let status = status.clone(); let raw_node = raw_node.clone(); + let storage = storage.clone(); let raft_loop_cond = raft_loop_cond.clone(); let notifications = notifications.clone(); - move || raft_main_loop(status, raw_node, raft_loop_cond, notifications) + move || raft_main_loop(status, raw_node, storage, raft_loop_cond, notifications) }; let conf_change_loop_fn = { let status = status.clone(); - move || raft_conf_change_loop(status) + let storage = storage.clone(); + move || raft_conf_change_loop(status, storage) }; let node = Node { @@ -138,11 +141,12 @@ impl Node { .unwrap(), topology_cache: CachedCell::new(), lc: { - let id = Storage::raft_id().unwrap().unwrap(); - let gen = Storage::gen().unwrap().unwrap_or(0) + 1; - Storage::persist_gen(gen).unwrap(); + let id = storage.raft_id().unwrap().unwrap(); + let gen = storage.gen().unwrap().unwrap_or(0) + 1; + storage.persist_gen(gen).unwrap(); Cell::new(Some(LogicalClock::new(id, gen))) }, + storage, }; // Wait for the node to enter the main loop @@ -415,6 +419,11 @@ impl Node { self.notifications.borrow_mut().insert(lc, tx); (lc, rx) } + + #[inline] + pub fn all_traft_entries(&self) -> ::tarantool::Result<Vec<traft::Entry>> { + self.storage.all_traft_entries() + } } #[derive(Debug)] @@ -443,6 +452,7 @@ fn handle_committed_entries( entries: Vec<raft::Entry>, notifications: &mut HashMap<LogicalClock, Notify>, raw_node: &mut RawNode, + storage: &mut RaftSpaceAccess, pool: &mut ConnectionPool, topology_changed: &mut bool, ) { @@ -467,13 +477,13 @@ fn handle_committed_entries( raw_node, ), raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { - handle_committed_conf_change(entry, raw_node) + handle_committed_conf_change(entry, raw_node, storage) } } } if let Some(last_entry) = entries.last() { - if let Err(e) = Storage::persist_applied(last_entry.index) { + if let Err(e) = storage.persist_applied(last_entry.index) { tlog!( Error, "error persisting applied index: {e}"; @@ -527,7 +537,11 @@ fn handle_committed_normal_entry( }); } -fn handle_committed_conf_change(entry: traft::Entry, raw_node: &mut RawNode) { +fn handle_committed_conf_change( + entry: traft::Entry, + raw_node: &mut RawNode, + storage: &mut RaftSpaceAccess, +) { let latch_unlock = || { with_joint_state_latch(|joint_state_latch| { if let Some(latch) = joint_state_latch.take() { @@ -574,7 +588,7 @@ fn handle_committed_conf_change(entry: traft::Entry, raw_node: &mut RawNode) { }; let raft_id = &raw_node.raft.id; - let voters_old = Storage::voters().unwrap(); + let voters_old = storage.voters().unwrap().unwrap_or_default(); if voters_old.contains(raft_id) && !conf_state.voters.contains(raft_id) { if is_joint { event::broadcast_when(Event::Demoted, Event::JointStateLeave).ok(); @@ -583,7 +597,7 @@ fn handle_committed_conf_change(entry: traft::Entry, raw_node: &mut RawNode) { } } - Storage::persist_conf_state(&conf_state).unwrap(); + storage.persist_conf_state(&conf_state).unwrap(); } fn handle_read_states( @@ -617,6 +631,7 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { fn raft_main_loop( status: Rc<RefCell<Status>>, raw_node: Rc<Mutex<RawNode>>, + mut storage: RaftSpaceAccess, raft_loop_cond: Rc<Cond>, notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, ) { @@ -673,6 +688,7 @@ fn raft_main_loop( committed_entries, &mut *notifications.borrow_mut(), &mut raw_node, + &mut storage, &mut pool, &mut topology_changed, ); @@ -680,13 +696,13 @@ fn raft_main_loop( if !ready.entries().is_empty() { let e = ready.entries(); // Append entries to the Raft log. - Storage::persist_entries(e).unwrap(); + storage.persist_entries(e).unwrap(); } if let Some(hs) = ready.hs() { // Raft HardState changed, and we need to persist it. // let hs = hs.clone(); - Storage::persist_hard_state(hs).unwrap(); + storage.persist_hard_state(hs).unwrap(); } if let Some(ss) = ready.ss() { @@ -715,7 +731,7 @@ fn raft_main_loop( // Update commit index. start_transaction(|| -> Result<(), TransactionError> { if let Some(commit) = light_rd.commit_index() { - Storage::persist_commit(commit).unwrap(); + storage.persist_commit(commit).unwrap(); } // Send out the messages. @@ -728,6 +744,7 @@ fn raft_main_loop( committed_entries, &mut *notifications.borrow_mut(), &mut raw_node, + &mut storage, &mut pool, &mut topology_changed, ); @@ -751,16 +768,18 @@ fn raft_main_loop( } } -fn raft_conf_change_loop(status: Rc<RefCell<Status>>) { +fn raft_conf_change_loop(status: Rc<RefCell<Status>>, storage: RaftSpaceAccess) { loop { if status.borrow().raft_state != "Leader" { event::wait(Event::StatusChanged).expect("Events system must be initialized"); continue; } - let term = Storage::term().unwrap().unwrap_or(0); - let voter_ids: HashSet<RaftId> = HashSet::from_iter(Storage::voters().unwrap()); - let learner_ids: HashSet<RaftId> = HashSet::from_iter(Storage::learners().unwrap()); + let term = storage.term().unwrap().unwrap_or(0); + let voter_ids: HashSet<RaftId> = + HashSet::from_iter(storage.voters().unwrap().unwrap_or_default()); + let learner_ids: HashSet<RaftId> = + HashSet::from_iter(storage.learners().unwrap().unwrap_or_default()); let peer_is_active: HashMap<RaftId, bool> = Storage::peers() .unwrap() .into_iter() @@ -905,7 +924,10 @@ fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::E fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> { let node = global()?; - let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + let cluster_id = node + .storage + .cluster_id()? + .ok_or("cluster_id is not set yet")?; if req.cluster_id != cluster_id { return Err(Box::new(Error::ClusterIdMismatch { @@ -920,7 +942,7 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error // A joined peer needs to communicate with other nodes. // Provide it the list of raft voters in response. let mut raft_group = vec![]; - for raft_id in Storage::voters()?.into_iter() { + for raft_id in node.storage.voters()?.unwrap_or_default().into_iter() { if let Some(peer) = Storage::peer_by_raft_id(raft_id)? { raft_group.push(peer); } else { @@ -944,7 +966,10 @@ pub fn expel_wrapper(instance_id: String) -> Result<(), traft::error::Error> { } fn expel_by_instance_id(instance_id: String) -> Result<ExpelResponse, Box<dyn std::error::Error>> { - let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + let cluster_id = global()? + .storage + .cluster_id()? + .ok_or("cluster_id is not set yet")?; expel(ExpelRequest { instance_id, @@ -979,7 +1004,10 @@ fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> } fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { - let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + let cluster_id = global()? + .storage + .cluster_id()? + .ok_or("cluster_id is not set yet")?; if req.cluster_id != cluster_id { return Err(Box::new(Error::ClusterIdMismatch { @@ -1007,7 +1035,7 @@ fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::erro fn raft_sync_raft(req: SyncRaftRequest) -> Result<(), Box<dyn std::error::Error>> { let deadline = Instant::now() + req.timeout; loop { - if Storage::commit().unwrap().unwrap() >= req.commit { + if global()?.storage.commit().unwrap().unwrap() >= req.commit { return Ok(()); } @@ -1036,7 +1064,7 @@ fn call_raft_sync_raft(promotee: &Peer, commit: u64) -> Result<(), Box<dyn std:: // Run on Leader by topology governor fn sync_raft(promotee: &Peer) -> Result<(), Box<dyn std::error::Error>> { - let commit = Storage::commit().unwrap().unwrap(); + let commit = global()?.storage.commit().unwrap().unwrap(); match call_raft_sync_raft(promotee, commit) { Ok(_) => { @@ -1049,7 +1077,10 @@ fn sync_raft(promotee: &Peer) -> Result<(), Box<dyn std::error::Error>> { } let instance_id = promotee.instance_id.clone(); - let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + let cluster_id = node + .storage + .cluster_id()? + .ok_or("cluster_id is not set yet")?; let req = UpdatePeerRequest::new(instance_id, cluster_id).with_grade(Grade::RaftSynced); diff --git a/src/traft/storage.rs b/src/traft/storage.rs index fe309882253cacfb12af6241ddb34fb5676e7377..462854f6049682ecb21f0d783beed0e4600f11cb 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -1,23 +1,15 @@ -use std::convert::TryFrom; - -use ::raft::prelude as raft; -use ::raft::Error as RaftError; use ::raft::StorageError; use ::raft::INVALID_ID; use ::tarantool::index::IteratorType; use ::tarantool::space::Space; use ::tarantool::tuple::{ToTupleBuffer, Tuple}; -use ::tarantool::unwrap_or; use serde::de::DeserializeOwned; -use serde::Serialize; use thiserror::Error; use crate::define_str_enum; -use crate::tlog; use crate::traft; use crate::traft::RaftId; use crate::traft::RaftIndex; -use crate::traft::RaftTerm; pub struct Storage; @@ -43,7 +35,6 @@ pub struct UnknownRaftSpace(pub String); // TODO(gmoshkin): remove this const RAFT_GROUP: &str = RaftSpace::Group.as_str(); const RAFT_STATE: &str = RaftSpace::State.as_str(); -const RAFT_LOG: &str = RaftSpace::Log.as_str(); //////////////////////////////////////////////////////////////////////////////// // RaftStateKey @@ -93,37 +84,6 @@ impl Storage { pub fn init_schema() { crate::tarantool::eval( r#" - box.schema.space.create('raft_log', { - if_not_exists = true, - is_local = true, - format = { - {name = 'entry_type', type = 'unsigned', is_nullable = false}, - {name = 'index', type = 'unsigned', is_nullable = false}, - {name = 'term', type = 'unsigned', is_nullable = false}, - {name = 'data', type = 'any', is_nullable = true}, - {name = 'context', type = 'any', is_nullable = true}, - } - }) - box.space.raft_log:create_index('pk', { - if_not_exists = true, - parts = {{'index'}}, - }) - - - box.schema.space.create('raft_state', { - if_not_exists = true, - is_local = true, - format = { - {name = 'key', type = 'string', is_nullable = false}, - {name = 'value', type = 'any', is_nullable = false}, - } - }) - box.space.raft_state:create_index('pk', { - if_not_exists = true, - parts = {{'key'}}, - }) - - box.schema.space.create('raft_group', { if_not_exists = true, is_local = true, @@ -166,13 +126,6 @@ impl Storage { .map_err(box_err) } - fn persist_raft_state<T: Serialize>(key: &str, value: T) -> Result<(), StorageError> { - Storage::space(RAFT_STATE)? - .put(&(key, value)) - .map_err(box_err)?; - Ok(()) - } - fn raft_state<T: DeserializeOwned>(key: &str) -> Result<Option<T>, StorageError> { let tuple: Option<Tuple> = Storage::space(RAFT_STATE)?.get(&(key,)).map_err(box_err)?; @@ -263,94 +216,10 @@ impl Storage { Ok(ret) } - pub fn raft_id() -> Result<Option<RaftId>, StorageError> { - Storage::raft_state("raft_id") - } - - #[allow(dead_code)] - pub fn instance_id() -> Result<Option<String>, StorageError> { - Storage::raft_state("instance_id") - } - - pub fn cluster_id() -> Result<Option<String>, StorageError> { - Storage::raft_state("cluster_id") - } - - /// Node generation i.e. the number of restarts. - pub fn gen() -> Result<Option<u64>, StorageError> { - Storage::raft_state("gen") - } - - pub fn term() -> Result<Option<RaftTerm>, StorageError> { - Storage::raft_state("term") - } - - pub fn vote() -> Result<Option<RaftId>, StorageError> { - Storage::raft_state("vote") - } - - pub fn commit() -> Result<Option<RaftIndex>, StorageError> { - Storage::raft_state("commit") - } - - pub fn applied() -> Result<Option<RaftIndex>, StorageError> { - Storage::raft_state("applied") - } - pub fn replication_factor() -> Result<Option<u8>, StorageError> { Storage::raft_state(RaftStateKey::ReplicationFactor.as_str()) } - pub fn persist_commit(commit: RaftIndex) -> Result<(), StorageError> { - // tlog!(Info, "++++++ persist commit {commit}"); - Storage::persist_raft_state("commit", commit) - } - - pub fn persist_applied(applied: RaftIndex) -> Result<(), StorageError> { - Storage::persist_raft_state("applied", applied) - } - - pub fn persist_term(term: RaftTerm) -> Result<(), StorageError> { - Storage::persist_raft_state("term", term) - } - - pub fn persist_vote(vote: RaftId) -> Result<(), StorageError> { - Storage::persist_raft_state("vote", vote) - } - - pub fn persist_gen(gen: u64) -> Result<(), StorageError> { - Storage::persist_raft_state("gen", gen) - } - - pub fn persist_raft_id(id: RaftId) -> Result<(), StorageError> { - Storage::space(RAFT_STATE)? - // We use `insert` instead of `replace` here - // because `raft_id` can never be changed. - .insert(&("raft_id", id)) - .map_err(box_err)?; - - Ok(()) - } - - pub fn persist_instance_id(id: &str) -> Result<(), StorageError> { - Storage::space(RAFT_STATE)? - // We use `insert` instead of `replace` here - // because `instance_id` can never be changed. - .insert(&("instance_id", id)) - .map_err(box_err)?; - - Ok(()) - } - - pub fn persist_cluster_id(id: &str) -> Result<(), StorageError> { - Storage::space(RAFT_STATE)? - // We use `insert` instead of `replace` here - // because `cluster_id` should never be changed. - .insert(&("cluster_id", id)) - .map_err(box_err)?; - Ok(()) - } - pub fn persist_peer(peer: &traft::Peer) -> Result<(), StorageError> { Storage::space(RAFT_GROUP)?.replace(peer).map_err(box_err)?; @@ -366,103 +235,6 @@ impl Storage { Ok(()) } - pub fn entries(low: RaftIndex, high: RaftIndex) -> Result<Vec<raft::Entry>, StorageError> { - // idx \in [low, high) - let mut ret: Vec<raft::Entry> = vec![]; - let iter = Storage::space(RAFT_LOG)? - .select(IteratorType::GE, &(low,)) - .map_err(box_err)?; - - for tuple in iter { - let row: traft::Entry = tuple.decode().map_err(box_err)?; - if row.index >= high { - break; - } - ret.push(row.into()); - } - - Ok(ret) - } - - pub fn all_traft_entries() -> Result<Vec<traft::Entry>, StorageError> { - Storage::space(RAFT_LOG)? - .select(IteratorType::All, &()) - .map_err(box_err)? - .map(|tuple| tuple.decode().map_err(box_err)) - .collect() - } - - pub fn persist_entries(entries: &[raft::Entry]) -> Result<(), StorageError> { - let mut space = Storage::space(RAFT_LOG)?; - for e in entries { - let row = traft::Entry::try_from(e).unwrap(); - space.replace(&row).map_err(box_err)?; - } - - Ok(()) - } - - pub fn voters() -> Result<Vec<RaftId>, StorageError> { - Ok(Storage::raft_state("voters")?.unwrap_or_default()) - } - - pub fn learners() -> Result<Vec<RaftId>, StorageError> { - Ok(Storage::raft_state("learners")?.unwrap_or_default()) - } - - pub fn active_learners() -> Result<Vec<RaftId>, StorageError> { - let learners = Storage::learners()?; - let mut res = Vec::with_capacity(learners.len()); - for raft_id in learners { - if unwrap_or!(Storage::peer_by_raft_id(raft_id)?, continue).is_active() { - res.push(raft_id) - } - } - Ok(res) - } - - pub fn conf_state() -> Result<raft::ConfState, StorageError> { - Ok(raft::ConfState { - voters: Storage::voters()?, - learners: Storage::learners()?, - voters_outgoing: Storage::raft_state("voters_outgoing")?.unwrap_or_default(), - learners_next: Storage::raft_state("learners_next")?.unwrap_or_default(), - auto_leave: Storage::raft_state("auto_leave")?.unwrap_or_default(), - ..Default::default() - }) - } - - pub fn persist_conf_state(cs: &raft::ConfState) -> Result<(), StorageError> { - Storage::persist_raft_state("voters", &cs.voters)?; - Storage::persist_raft_state("learners", &cs.learners)?; - Storage::persist_raft_state("voters_outgoing", &cs.voters_outgoing)?; - Storage::persist_raft_state("learners_next", &cs.learners_next)?; - Storage::persist_raft_state("auto_leave", &cs.auto_leave)?; - Ok(()) - } - - pub fn hard_state() -> Result<raft::HardState, StorageError> { - let mut ret = raft::HardState::default(); - if let Some(term) = Storage::term()? { - ret.term = term; - } - if let Some(vote) = Storage::vote()? { - ret.vote = vote; - } - if let Some(commit) = Storage::commit()? { - ret.commit = commit; - } - - Ok(ret) - } - - pub fn persist_hard_state(hs: &raft::HardState) -> Result<(), StorageError> { - Storage::persist_term(hs.term)?; - Storage::persist_vote(hs.vote)?; - Storage::persist_commit(hs.commit)?; - Ok(()) - } - pub fn insert(space: RaftSpace, tuple: &impl ToTupleBuffer) -> Result<Tuple, StorageError> { Storage::space(space.as_str())? .insert(tuple) @@ -496,190 +268,12 @@ impl Storage { } } -impl raft::Storage for Storage { - fn initial_state(&self) -> Result<raft::RaftState, RaftError> { - // See also: https://github.com/etcd-io/etcd/blob/main/raft/raftpb/raft.pb.go - let hs = Storage::hard_state()?; - let cs = Storage::conf_state()?; - - let ret = raft::RaftState::new(hs, cs); - Ok(ret) - } - - fn entries( - &self, - low: RaftIndex, - high: RaftIndex, - _max_size: impl Into<Option<u64>>, - ) -> Result<Vec<raft::Entry>, RaftError> { - // tlog!(Info, "++++++ entries {low} {high}"); - Ok(Storage::entries(low, high)?) - } - - fn term(&self, idx: RaftIndex) -> Result<RaftTerm, RaftError> { - if idx == 0 { - return Ok(0); - } - // tlog!(Info, "++++++ term {idx}"); - - let tuple = Storage::space(RAFT_LOG)?.get(&(idx,)).map_err(box_err)?; - - if let Some(tuple) = tuple { - Ok(tuple.field(2).map_err(box_err)?.unwrap()) - } else { - Err(RaftError::Store(StorageError::Unavailable)) - } - } - - fn first_index(&self) -> Result<RaftIndex, RaftError> { - // tlog!(Info, "++++++ first_index"); - Ok(1) - } - - fn last_index(&self) -> Result<RaftIndex, RaftError> { - let space: Space = Storage::space(RAFT_LOG)?; - let tuple: Option<Tuple> = space.primary_key().max(&()).map_err(box_err)?; - - if let Some(t) = tuple { - Ok(t.field(1).map_err(box_err)?.unwrap()) - } else { - Ok(0) - } - } - - fn snapshot(&self, idx: RaftIndex) -> Result<raft::Snapshot, RaftError> { - tlog!(Critical, "snapshot"; "request_index" => idx); - unimplemented!(); - - // Ok(Storage::snapshot()?) - } -} - macro_rules! assert_err { ($expr:expr, $err:expr) => { assert_eq!($expr.map_err(|e| format!("{e}")), Err($err.into())) }; } -inventory::submit!(crate::InnerTest { - name: "test_storage_log", - body: || { - use ::raft::Storage as _; - let test_entries = vec![raft::Entry { - term: 9u64, - index: 99u64, - ..Default::default() - }]; - - Storage::persist_entries(&test_entries).unwrap(); - - assert_eq!(Storage.first_index(), Ok(1)); - assert_eq!(Storage.last_index(), Ok(99)); - assert_eq!(Storage.term(99), Ok(9)); - assert_eq!(Storage.entries(1, 99, u64::MAX), Ok(vec![])); - assert_eq!(Storage.entries(1, 100, u64::MAX), Ok(test_entries)); - - assert_eq!( - Storage.term(100).map_err(|e| format!("{e}")), - Err("log unavailable".into()) - ); - - let mut raft_log = Storage::space("raft_log").unwrap(); - - raft_log.put(&(1337, 99, 1, "", ())).unwrap(); - assert_err!( - Storage.entries(1, 100, u64::MAX), - "unknown error Failed to decode tuple: unknown entry type (1337)" - ); - - raft_log.put(&(0, 99, 1, "", false)).unwrap(); - assert_err!( - Storage.entries(1, 100, u64::MAX), - concat!( - "unknown error", - " Failed to decode tuple:", - " data did not match any variant", - " of untagged enum EntryContext" - ) - ); - - raft_log.primary_key().drop().unwrap(); - assert_err!( - Storage.entries(1, 100, u64::MAX), - concat!( - "unknown error", - " Tarantool error:", - " NoSuchIndexID:", - " No index #0 is defined in space 'raft_log'" - ) - ); - - raft_log.drop().unwrap(); - assert_err!( - Storage.entries(1, 100, u64::MAX), - "unknown error no such space \"raft_log\"" - ); - } -}); - -inventory::submit!(crate::InnerTest { - name: "test_storage_state", - body: || { - use ::raft::Storage as _; - - Storage::persist_term(9u64).unwrap(); - Storage::persist_vote(1u64).unwrap(); - Storage::persist_commit(98u64).unwrap(); - Storage::persist_applied(97u64).unwrap(); - - let state = Storage.initial_state().unwrap(); - assert_eq!( - state.hard_state, - raft::HardState { - term: 9, - vote: 1, - commit: 98, - ..Default::default() - } - ); - - let mut raft_state = Storage::space("raft_state").unwrap(); - - raft_state.delete(&("id",)).unwrap(); - assert_eq!(Storage::raft_id(), Ok(None)); - - Storage::persist_raft_id(16).unwrap(); - assert_err!( - Storage::persist_raft_id(32), - concat!( - "unknown error", - " Tarantool error:", - " TupleFound:", - " Duplicate key exists in unique index \"pk\" in space \"raft_state\"", - " with old tuple - [\"raft_id\", 16]", - " and new tuple - [\"raft_id\", 32]" - ) - ); - - raft_state.primary_key().drop().unwrap(); - assert_err!( - Storage::term(), - concat!( - "unknown error", - " Tarantool error:", - " NoSuchIndexID:", - " No index #0 is defined in space 'raft_state'" - ) - ); - - raft_state.drop().unwrap(); - assert_err!( - Storage::commit(), - "unknown error no such space \"raft_state\"" - ); - } -}); - #[rustfmt::skip] inventory::submit!(crate::InnerTest { name: "test_storage_peers", diff --git a/src/traft/storage2.rs b/src/traft/storage2.rs index 7791850e2f12d7dd1796e5c7693fe0493d9e80da..be5d0cf552ece612c73a680653acd459df0bd09a 100644 --- a/src/traft/storage2.rs +++ b/src/traft/storage2.rs @@ -23,27 +23,32 @@ pub struct RaftSpaceAccess { } macro_rules! state_impl { - ($vis:vis getter, $fn_name:ident, $value_type:ty) => { - $vis fn $fn_name(&self) -> tarantool::Result<Option<$value_type>> { - let key: &str = stringify!($fn_name); - let tuple: Option<Tuple> = self.space_raft_state.get(&(key,))?; - - match tuple { - Some(t) => t.field(Self::FIELD_STATE_VALUE), - None => Ok(None), + ( + $( + $(#[$get_meta:meta])* $get_vis:vis $property:ident: $get_ty:ty; + $(#[$set_meta:meta])* $set_vis:vis $setter:ident: $mod:ident, $set_ty:ty; + )+ + ) => { + $( + $(#[$get_meta])* + $get_vis fn $property(&self) -> tarantool::Result<Option<$get_ty>> { + let key: &str = stringify!($property); + let tuple: Option<Tuple> = self.space_raft_state.get(&(key,))?; + + match tuple { + Some(t) => t.field(Self::FIELD_STATE_VALUE), + None => Ok(None), + } } - } - }; - // Q: Why passing `fn_name` and `key` separately instead of using `concat!()`? - // A: It doesn't work. See https://github.com/rust-lang/rust/issues/12249. - // Also, it's easier to grep full names. - ($vis:vis setter, $fn_name: ident, $key:literal, $modifier:ident, $value_type: ty) => { - $vis fn $fn_name(&mut self, value: $value_type) -> tarantool::Result<()> { - self.space_raft_state.$modifier(&($key, value))?; - Ok(()) - } - }; + $(#[$set_meta])* + $set_vis fn $setter(&mut self, value: $set_ty) -> tarantool::Result<()> { + let key: &str = stringify!($property); + self.space_raft_state.$mod(&(key, value))?; + Ok(()) + } + )+ + } } impl RaftSpaceAccess { @@ -58,6 +63,7 @@ impl RaftSpaceAccess { let space_raft_log = Space::builder(Self::SPACE_RAFT_LOG) .is_local(true) + .is_temporary(false) .field(Field::unsigned("entry_type")) .field(Field::unsigned("index")) .field(Field::unsigned("term")) @@ -75,6 +81,7 @@ impl RaftSpaceAccess { let space_raft_state = Space::builder(Self::SPACE_RAFT_STATE) .is_local(true) + .is_temporary(false) .field(Field::string("key")) .field(Field::any("value")) .if_not_exists(true) @@ -96,18 +103,48 @@ impl RaftSpaceAccess { // Find the meaning of the fields here: // https://github.com/etcd-io/etcd/blob/main/raft/raftpb/raft.pb.go - state_impl!(pub getter, raft_id, u64); - state_impl!(pub getter, cluster_id, String); - // state_impl!(pub getter, gen, u64); // Node generation i.e. the number of restarts. - state_impl!(getter, term, RaftTerm); - state_impl!(getter, vote, RaftId); - state_impl!(getter, commit, RaftIndex); - // state_impl!(pub getter, applied, RaftIndex); - state_impl!(getter, voters, Vec<RaftId>); - state_impl!(getter, learners, Vec<RaftId>); - state_impl!(getter, voters_outgoing, Vec<RaftId>); - state_impl!(getter, learners_next, Vec<RaftId>); - state_impl!(getter, auto_leave, bool); + state_impl! { + pub raft_id: u64; + pub persist_raft_id: insert, u64; + + #[allow(dead_code)] + pub instance_id: String; + pub persist_instance_id: insert, &str; + + pub cluster_id: String; + pub persist_cluster_id: insert, &str; + + /// Node generation i.e. the number of restarts. + pub gen: u64; + pub persist_gen: replace, u64; + + pub(super) term: RaftTerm; + persist_term: replace, RaftTerm; + + vote: RaftId; + persist_vote: replace, RaftId; + + pub(super) commit: RaftIndex; + pub persist_commit: replace, RaftIndex; + + pub applied: RaftIndex; + pub persist_applied: replace, RaftIndex; + + pub(super) voters: Vec<RaftId>; + persist_voters: replace, &[RaftId]; + + pub(super) learners: Vec<RaftId>; + persist_learners: replace, &[RaftId]; + + voters_outgoing: Vec<RaftId>; + persist_voters_outgoing: replace, &[RaftId]; + + learners_next: Vec<RaftId>; + persist_learners_next: replace, &[RaftId]; + + auto_leave: bool; + persist_auto_leave: replace, bool; + } pub fn conf_state(&self) -> tarantool::Result<raft::ConfState> { Ok(raft::ConfState { @@ -145,31 +182,12 @@ impl RaftSpaceAccess { Ok(ret) } - state_impl!(pub setter, persist_raft_id, "raft_id", insert, u64); - state_impl!(pub setter, persist_cluster_id, "cluster_id", insert, &str); - // state_impl!(pub setter, persist_gen, "gen", replace, u64); - state_impl!(setter, persist_term, "term", replace, RaftTerm); - state_impl!(setter, persist_vote, "vote", replace, RaftId); - state_impl!(setter, persist_commit, "commit", replace, RaftIndex); - // state_impl!(pub setter, persist_applied, "applied", replace, RaftIndex); - - state_impl!(setter, persist_voters, "voters", replace, &Vec<RaftId>); - state_impl!(setter, persist_learners, "learners", replace, &Vec<RaftId>); - state_impl!( - setter, - persist_voters_outgoing, - "voters_outgoing", - replace, - &Vec<RaftId> - ); - state_impl!( - setter, - persist_learners_next, - "learners_next", - replace, - &Vec<RaftId> - ); - state_impl!(setter, persist_auto_leave, "auto_leave", replace, bool); + pub fn all_traft_entries(&self) -> tarantool::Result<Vec<traft::Entry>> { + self.space_raft_log + .select(IteratorType::All, &())? + .map(|tuple| tuple.decode()) + .collect() + } pub fn persist_conf_state(&mut self, cs: &raft::ConfState) -> tarantool::Result<()> { self.persist_voters(&cs.voters)?;