From ec8605f3254dc820ce32c4afb4677cc20c28788e Mon Sep 17 00:00:00 2001 From: Yaroslav Dynnikov <yaroslav.dynnikov@gmail.com> Date: Tue, 26 Jul 2022 16:17:23 +0300 Subject: [PATCH] refacor: give u64 fields more meaningful type names Everywhere in code use `RaftId`, `RaftTerm`, `RaftIndex` instead of obscure `u64`. --- src/main.rs | 6 +++--- src/traft/mod.rs | 12 ++++++----- src/traft/network.rs | 2 +- src/traft/node.rs | 20 +++++++++++------- src/traft/storage.rs | 48 ++++++++++++++++++++++--------------------- src/traft/topology.rs | 2 +- 6 files changed, 50 insertions(+), 40 deletions(-) diff --git a/src/main.rs b/src/main.rs index 02ff4af331..9df95cbf17 100644 --- a/src/main.rs +++ b/src/main.rs @@ -15,7 +15,7 @@ use std::time::{Duration, Instant}; use clap::StructOpt as _; use protobuf::Message as _; -use crate::traft::LogicalClock; +use crate::traft::{LogicalClock, RaftIndex}; use traft::error::Error; mod app; @@ -64,7 +64,7 @@ fn picolib_setup(args: &args::Run) { ); luamod.set( "raft_read_index", - tlua::function1(|timeout: f64| -> Result<u64, Error> { + tlua::function1(|timeout: f64| -> Result<RaftIndex, Error> { traft::node::global()?.read_index(Duration::from_secs_f64(timeout)) }), ); @@ -99,7 +99,7 @@ fn picolib_setup(args: &args::Run) { ); luamod.set( "raft_return_one", - tlua::function1(|timeout: f64| -> Result<u64, Error> { + tlua::function1(|timeout: f64| -> Result<u8, Error> { traft::node::global()?.propose(traft::OpReturnOne, Duration::from_secs_f64(timeout)) }), ); diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 3b4ffb4f7d..008234af4e 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -28,6 +28,8 @@ pub use storage::Storage; pub use topology::Topology; pub type RaftId = u64; +pub type RaftTerm = u64; +pub type RaftIndex = u64; pub type InstanceId = String; pub type ReplicasetId = String; @@ -147,7 +149,7 @@ impl From<OpReturnOne> for Op { pub struct OpReturnOne; impl OpResult for OpReturnOne { - type Result = u64; + type Result = u8; fn result(&self) -> Self::Result { 1 } @@ -168,7 +170,7 @@ pub struct Peer { /// Used for identifying raft nodes. /// Must be unique in the raft group. - pub raft_id: u64, + pub raft_id: RaftId, /// Inbound address used for communication with the node. /// Not to be confused with listen address. @@ -180,7 +182,7 @@ pub struct Peer { /// Index of the most recent raft log entry that persisted this peer. /// `0` means it's not committed yet. - pub commit_index: u64, + pub commit_index: RaftIndex, /// The state of this instance's activity. pub health: Health, @@ -235,8 +237,8 @@ pub struct Entry { /// ``` #[serde(with = "entry_type_as_i32")] pub entry_type: raft::EntryType, - pub index: u64, - pub term: u64, + pub index: RaftIndex, + pub term: RaftTerm, /// Corresponding `entry.data`. Solely managed by `raft-rs`. #[serde(with = "serde_bytes")] diff --git a/src/traft/network.rs b/src/traft/network.rs index 8cb4de5463..f8c7ac6b91 100644 --- a/src/traft/network.rs +++ b/src/traft/network.rs @@ -274,7 +274,7 @@ inventory::submit!(crate::InnerTest { tlog!(Info, "TEST: connecting {listen}"); pool.connect(1337, listen); - let heartbeat_to_from = |to: u64, from: u64| raft::Message { + let heartbeat_to_from = |to: RaftId, from: RaftId| raft::Message { msg_type: raft::MessageType::MsgHeartbeat, to, from, diff --git a/src/traft/node.rs b/src/traft/node.rs index 89480211f1..4c7ba9517b 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -27,6 +27,8 @@ use crate::stringify_cfunc; use crate::traft::ContextCoercion as _; use crate::traft::Peer; use crate::traft::RaftId; +use crate::traft::RaftIndex; +use crate::traft::RaftTerm; use ::tarantool::util::IntoClones as _; use protobuf::Message as _; use std::iter::FromIterator as _; @@ -54,9 +56,9 @@ type RawNode = raft::RawNode<Storage>; #[derive(Clone, Debug, tlua::Push, tlua::PushInto)] pub struct Status { /// `raft_id` of the current instance - pub id: u64, + pub id: RaftId, /// `raft_id` of the leader instance - pub leader_id: Option<u64>, + pub leader_id: Option<RaftId>, /// One of "Follower", "Candidate", "Leader", "PreCandidate" pub raft_state: String, /// Whether instance has finished its `postjoin` @@ -72,7 +74,7 @@ pub struct Node { _conf_change_loop: fiber::UnitJoinHandle<'static>, status: Rc<RefCell<Status>>, notifications: Rc<RefCell<HashMap<LogicalClock, Notify>>>, - topology_cache: CachedCell<u64, Topology>, + topology_cache: CachedCell<RaftTerm, Topology>, lc: Cell<Option<LogicalClock>>, } @@ -153,7 +155,7 @@ impl Node { event::wait(Event::StatusChanged).expect("Events system wasn't initialized"); } - pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> { + pub fn read_index(&self, timeout: Duration) -> Result<RaftIndex, Error> { self.raw_operation(|raw_node| { // In some states `raft-rs` ignores the ReadIndex request. // Check it preliminary, don't wait for the timeout. @@ -176,7 +178,7 @@ impl Node { raw_node.read_index(ctx.to_bytes()); Ok(notify) })? - .recv_timeout::<u64>(timeout) + .recv_timeout::<RaftIndex>(timeout) } pub fn propose<T: OpResult + Into<traft::Op>>( @@ -292,7 +294,11 @@ impl Node { .recv::<Peer>() } - fn propose_conf_change(&self, term: u64, conf_change: raft::ConfChangeV2) -> Result<(), Error> { + fn propose_conf_change( + &self, + term: RaftTerm, + conf_change: raft::ConfChangeV2, + ) -> Result<(), Error> { self.raw_operation(|raw_node| { // In some states proposing a ConfChange is impossible. // Check if there's a reason to reject it. @@ -385,7 +391,7 @@ struct JointStateLatch { /// Index of the latest ConfChange entry proposed. /// Helps detecting when the entry is overridden /// due to a re-election. - index: u64, + index: RaftIndex, /// Make a notification when the latch is unlocked. /// Notification is a `Result<Box<()>>`. diff --git a/src/traft/storage.rs b/src/traft/storage.rs index d9490e0896..fe560a4e8c 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -15,6 +15,8 @@ use thiserror::Error; use crate::tlog; use crate::traft; use crate::traft::RaftId; +use crate::traft::RaftIndex; +use crate::traft::RaftTerm; pub struct Storage; @@ -130,7 +132,7 @@ impl Storage { } } - pub fn peer_by_raft_id(raft_id: u64) -> Result<Option<traft::Peer>, StorageError> { + pub fn peer_by_raft_id(raft_id: RaftId) -> Result<Option<traft::Peer>, StorageError> { if raft_id == INVALID_ID { unreachable!("peer_by_raft_id called with invalid id ({})", INVALID_ID); } @@ -181,7 +183,7 @@ impl Storage { pub fn box_replication( replicaset_id: &str, - max_index: Option<u64>, + max_index: Option<RaftIndex>, ) -> Result<Vec<String>, StorageError> { let mut ret = Vec::new(); @@ -211,7 +213,7 @@ impl Storage { Ok(ret) } - pub fn raft_id() -> Result<Option<u64>, StorageError> { + pub fn raft_id() -> Result<Option<RaftId>, StorageError> { Storage::raft_state("raft_id") } @@ -229,19 +231,19 @@ impl Storage { Storage::raft_state("gen") } - pub fn term() -> Result<Option<u64>, StorageError> { + pub fn term() -> Result<Option<RaftTerm>, StorageError> { Storage::raft_state("term") } - pub fn vote() -> Result<Option<u64>, StorageError> { + pub fn vote() -> Result<Option<RaftId>, StorageError> { Storage::raft_state("vote") } - pub fn commit() -> Result<Option<u64>, StorageError> { + pub fn commit() -> Result<Option<RaftIndex>, StorageError> { Storage::raft_state("commit") } - pub fn applied() -> Result<Option<u64>, StorageError> { + pub fn applied() -> Result<Option<RaftIndex>, StorageError> { Storage::raft_state("applied") } @@ -253,20 +255,20 @@ impl Storage { Storage::persist_raft_state("replication_factor", replication_factor) } - pub fn persist_commit(commit: u64) -> Result<(), StorageError> { + pub fn persist_commit(commit: RaftIndex) -> Result<(), StorageError> { // tlog!(Info, "++++++ persist commit {commit}"); Storage::persist_raft_state("commit", commit) } - pub fn persist_applied(applied: u64) -> Result<(), StorageError> { + pub fn persist_applied(applied: RaftIndex) -> Result<(), StorageError> { Storage::persist_raft_state("applied", applied) } - pub fn persist_term(term: u64) -> Result<(), StorageError> { + pub fn persist_term(term: RaftTerm) -> Result<(), StorageError> { Storage::persist_raft_state("term", term) } - pub fn persist_vote(vote: u64) -> Result<(), StorageError> { + pub fn persist_vote(vote: RaftId) -> Result<(), StorageError> { Storage::persist_raft_state("vote", vote) } @@ -274,7 +276,7 @@ impl Storage { Storage::persist_raft_state("gen", gen) } - pub fn persist_raft_id(id: u64) -> Result<(), StorageError> { + pub fn persist_raft_id(id: RaftId) -> Result<(), StorageError> { Storage::space(RAFT_STATE)? // We use `insert` instead of `replace` here // because `raft_id` can never be changed. @@ -312,7 +314,7 @@ impl Storage { } #[allow(dead_code)] - pub fn delete_peer(raft_id: u64) -> Result<(), StorageError> { + pub fn delete_peer(raft_id: RaftId) -> Result<(), StorageError> { Storage::space(RAFT_GROUP)? .delete(&[raft_id]) .map_err(box_err!())?; @@ -320,7 +322,7 @@ impl Storage { Ok(()) } - pub fn entries(low: u64, high: u64) -> Result<Vec<raft::Entry>, StorageError> { + pub fn entries(low: RaftIndex, high: RaftIndex) -> Result<Vec<raft::Entry>, StorageError> { // idx \in [low, high) let mut ret: Vec<raft::Entry> = vec![]; let iter = Storage::space(RAFT_LOG)? @@ -430,15 +432,15 @@ impl raft::Storage for Storage { fn entries( &self, - low: u64, - high: u64, + low: RaftIndex, + high: RaftIndex, _max_size: impl Into<Option<u64>>, ) -> Result<Vec<raft::Entry>, RaftError> { // tlog!(Info, "++++++ entries {low} {high}"); Ok(Storage::entries(low, high)?) } - fn term(&self, idx: u64) -> Result<u64, RaftError> { + fn term(&self, idx: RaftIndex) -> Result<RaftTerm, RaftError> { if idx == 0 { return Ok(0); } @@ -453,12 +455,12 @@ impl raft::Storage for Storage { } } - fn first_index(&self) -> Result<u64, RaftError> { + fn first_index(&self) -> Result<RaftIndex, RaftError> { // tlog!(Info, "++++++ first_index"); Ok(1) } - fn last_index(&self) -> Result<u64, RaftError> { + fn last_index(&self) -> Result<RaftIndex, RaftError> { let space: Space = Storage::space(RAFT_LOG)?; let tuple: Option<Tuple> = space.primary_key().max(&()).map_err(box_err!())?; @@ -469,7 +471,7 @@ impl raft::Storage for Storage { } } - fn snapshot(&self, idx: u64) -> Result<raft::Snapshot, RaftError> { + fn snapshot(&self, idx: RaftIndex) -> Result<raft::Snapshot, RaftError> { tlog!(Critical, "snapshot"; "request_index" => idx); unimplemented!(); @@ -666,7 +668,7 @@ inventory::submit!(crate::InnerTest { { // Ensure traft storage doesn't impose restrictions // on peer_address uniqueness. - let peer = |id: u64, addr: &str| traft::Peer { + let peer = |id: RaftId, addr: &str| traft::Peer { raft_id: id, instance_id: format!("i{id}"), peer_address: addr.into(), @@ -677,7 +679,7 @@ inventory::submit!(crate::InnerTest { Storage::persist_peer(&peer(11, "addr:collision")).unwrap(); } - let peer_by_raft_id = |id: u64| Storage::peer_by_raft_id(id).unwrap().unwrap(); + let peer_by_raft_id = |id: RaftId| Storage::peer_by_raft_id(id).unwrap().unwrap(); { assert_eq!(peer_by_raft_id(1).instance_id, "i1"); assert_eq!(peer_by_raft_id(2).instance_id, "i2"); @@ -701,7 +703,7 @@ inventory::submit!(crate::InnerTest { assert_eq!(Storage::peer_by_instance_id("i6"), Ok(None)); } - let box_replication = |replicaset_id: &str, max_index: Option<u64>| { + let box_replication = |replicaset_id: &str, max_index: Option<RaftIndex>| { Storage::box_replication(replicaset_id, max_index).unwrap() }; diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 4e6f6ee114..4085b55e09 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -62,7 +62,7 @@ impl Topology { .insert(instance_id); } - fn choose_instance_id(&self, raft_id: u64) -> String { + fn choose_instance_id(&self, raft_id: RaftId) -> String { let mut suffix: Option<u64> = None; loop { let ret = match suffix { -- GitLab