diff --git a/src/cache.rs b/src/cache.rs new file mode 100644 index 0000000000000000000000000000000000000000..3c1e65a6cbf867d93426d52d2864dbae9adb50f9 --- /dev/null +++ b/src/cache.rs @@ -0,0 +1,28 @@ +use std::cell::Cell; + +pub struct CachedCell<K, T> { + cell: Cell<Option<(K, T)>>, +} + +impl<K, T> CachedCell<K, T> +where + K: PartialEq, +{ + pub fn new() -> Self { + Self { + cell: Cell::default(), + } + } + + pub fn put(&self, key: K, value: T) { + self.cell.set(Some((key, value))); + } + + pub fn pop(&self, key: &K) -> Option<T> { + match self.cell.take() { + Some((k, v)) if k == *key => Some(v), + Some(_) => None, + None => None, + } + } +} diff --git a/src/main.rs b/src/main.rs index 630222dea8380f92285880ec73d09440a850fce2..d048dc0b720a7dc97aefa6b7a65bb2b61db058d6 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,13 +10,16 @@ use ::tarantool::fiber; use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::convert::TryFrom; -use std::time::{Duration, Instant}; +use std::time::Duration; use clap::StructOpt as _; use protobuf::Message as _; +use crate::traft::{EntryContextNormal, LogicalClock}; + mod app; mod args; +mod cache; mod discovery; mod ipc; mod mailbox; @@ -411,7 +414,6 @@ fn start_boot(args: &args::Run) { tlog!(Info, ">>>>> start_boot()"); let peer = traft::topology::initial_peer( - args.cluster_id.clone(), args.instance_id(), args.replicaset_id.clone(), args.advertise_address(), @@ -444,27 +446,42 @@ fn start_boot(args: &args::Run) { ..Default::default() }; - let entry = { + let e1 = { + let ctx = traft::EntryContextNormal { + op: traft::Op::PersistPeer { peer }, + lc: LogicalClock::new(raft_id, 0), + }; + let e = traft::Entry { + entry_type: raft::EntryType::EntryNormal, + index: 1, + term: 1, + data: vec![], + context: Some(traft::EntryContext::Normal(ctx)), + }; + + raft::Entry::try_from(e).unwrap() + }; + + let e2 = { let conf_change = raft::ConfChange { change_type: raft::ConfChangeType::AddNode, node_id: raft_id, ..Default::default() }; - let ctx = traft::EntryContextConfChange { peers: vec![peer] }; let e = traft::Entry { entry_type: raft::EntryType::EntryConfChange, - index: 1, + index: 2, term: 1, data: conf_change.write_to_bytes().unwrap(), - context: Some(traft::EntryContext::ConfChange(ctx)), + context: None, }; raft::Entry::try_from(e).unwrap() }; traft::Storage::persist_conf_state(&cs).unwrap(); - traft::Storage::persist_entries(&[entry]).unwrap(); - traft::Storage::persist_commit(1).unwrap(); + traft::Storage::persist_entries(&[e1, e2]).unwrap(); + traft::Storage::persist_commit(2).unwrap(); traft::Storage::persist_term(1).unwrap(); traft::Storage::persist_id(raft_id).unwrap(); traft::Storage::persist_cluster_id(&args.cluster_id).unwrap(); @@ -557,7 +574,7 @@ fn postjoin(args: &args::Run) { ); node.tick(1); // apply configuration, if any - node.campaign(); // trigger election immediately + node.campaign().ok(); // trigger election immediately assert_eq!(node.status().raft_state, "Leader"); } @@ -589,42 +606,42 @@ fn postjoin(args: &args::Run) { box_cfg.replication = traft::Storage::box_replication(&peer.replicaset_id, None).unwrap(); tarantool::set_cfg(&box_cfg); - loop { - let timeout = Duration::from_millis(220); - let me = traft::Storage::peer_by_raft_id(raft_id) - .unwrap() - .expect("peer not found"); - - if me.voter && me.peer_address == args.advertise_address() { - // already ok - break; - } - - tlog!(Warning, "initiating self-promotion of {me:?}"); - let req = traft::JoinRequest { - cluster_id: args.cluster_id.clone(), - instance_id: Some(me.instance_id.clone()), - replicaset_id: None, // TODO - voter: true, - advertise_address: args.advertise_address(), - }; - - let leader_id = node.status().leader_id.expect("leader_id deinitialized"); - let leader = traft::Storage::peer_by_raft_id(leader_id).unwrap().unwrap(); - - let fn_name = stringify_cfunc!(traft::node::raft_join); - let now = Instant::now(); - match tarantool::net_box_call(&leader.peer_address, fn_name, &req, timeout) { - Err(e) => { - tlog!(Error, "failed to promote myself: {e}"); - fiber::sleep(timeout.saturating_sub(now.elapsed())); - continue; - } - Ok(traft::JoinResponse { .. }) => { - break; - } - }; - } + // loop { + // let timeout = Duration::from_millis(220); + // let me = traft::Storage::peer_by_raft_id(raft_id) + // .unwrap() + // .expect("peer not found"); + + // if me.active && me.peer_address == args.advertise_address() { + // // already ok + // break; + // } + + // tlog!(Warning, "initiating self-promotion of {me:?}"); + // let req = traft::JoinRequest { + // cluster_id: args.cluster_id.clone(), + // instance_id: Some(me.instance_id.clone()), + // replicaset_id: None, // TODO + // voter: true, + // advertise_address: args.advertise_address(), + // }; + + // let leader_id = node.status().leader_id.expect("leader_id deinitialized"); + // let leader = traft::Storage::peer_by_raft_id(leader_id).unwrap().unwrap(); + + // let fn_name = stringify_cfunc!(traft::node::raft_join); + // let now = Instant::now(); + // match tarantool::net_box_call(&leader.peer_address, fn_name, &req, timeout) { + // Err(e) => { + // tlog!(Error, "failed to promote myself: {e}"); + // fiber::sleep(timeout.saturating_sub(now.elapsed())); + // continue; + // } + // Ok(traft::JoinResponse { .. }) => { + // break; + // } + // }; + // } node.mark_as_ready(); } diff --git a/src/traft/failover.rs b/src/traft/failover.rs index a59eed5e72b37ecfddb6783575107245fd3e5d85..c9510fbd37d8a1e7ed150fd51df2dbe25ccd9a3a 100644 --- a/src/traft/failover.rs +++ b/src/traft/failover.rs @@ -7,7 +7,7 @@ use crate::{stringify_cfunc, tarantool, tlog}; use crate::traft::node; use crate::traft::node::Error; use crate::traft::Storage; -use crate::traft::{DeactivateRequest, DeactivateResponse}; +use crate::traft::{SetActiveRequest, SetActiveResponse}; pub fn on_shutdown() { let voters = Storage::voters().expect("failed reading 'voters'"); @@ -19,11 +19,12 @@ pub fn on_shutdown() { } let peer = Storage::peer_by_raft_id(raft_id).unwrap().unwrap(); - let req = DeactivateRequest { + let req = SetActiveRequest { instance_id: peer.instance_id, cluster_id: Storage::cluster_id() .unwrap() .expect("cluster_id must be present"), + active: false, }; let fn_name = stringify_cfunc!(raft_deactivate); @@ -42,7 +43,7 @@ pub fn on_shutdown() { ); continue; } - Ok(DeactivateResponse { .. }) => { + Ok(SetActiveResponse { .. }) => { break; } }; @@ -50,9 +51,7 @@ pub fn on_shutdown() { } #[proc(packed_args)] -fn raft_deactivate( - req: DeactivateRequest, -) -> Result<DeactivateResponse, Box<dyn std::error::Error>> { +fn raft_deactivate(req: SetActiveRequest) -> Result<SetActiveResponse, Box<dyn std::error::Error>> { let node = node::global()?; let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; @@ -64,7 +63,6 @@ fn raft_deactivate( })); } - node.change_topology(req)?; - - Ok(DeactivateResponse {}) + let peer = node.handle_topology_request(req.into())?; + Ok(SetActiveResponse { peer }) } diff --git a/src/traft/mod.rs b/src/traft/mod.rs index 010598b3852e9be0c84bb4966f21dd0cece6008f..18858bce79f8b3797590d5170c93d579d196a500 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -22,6 +22,8 @@ pub use storage::Storage; pub use topology::Topology; pub type RaftId = u64; +pub type InstanceId = String; +pub type ReplicasetId = String; ////////////////////////////////////////////////////////////////////////////////////////// /// Timestamps for raft entries. @@ -57,11 +59,18 @@ pub enum Op { /// No operation. Nop, /// Print the message in tarantool log. - Info { msg: String }, + Info { + msg: String, + }, /// Evaluate the code on every instance in cluster. - EvalLua { code: String }, + EvalLua { + code: String, + }, /// ReturnOne(OpReturnOne), + PersistPeer { + peer: Peer, + }, } impl Op { @@ -77,6 +86,10 @@ impl Op { Box::new(()) } Self::ReturnOne(op) => Box::new(op.result()), + Self::PersistPeer { peer } => { + Storage::persist_peer(peer).unwrap(); + Box::new(peer.clone()) + } } } } @@ -111,23 +124,27 @@ pub trait OpResult { /// Serializable struct representing a member of the raft group. #[derive(Clone, Debug, Default, Serialize, Deserialize, PartialEq)] pub struct Peer { + /// Instances are identified by name. + pub instance_id: String, + pub instance_uuid: String, + /// Used for identifying raft nodes. /// Must be unique in the raft group. pub raft_id: u64, + /// Inbound address used for communication with the node. /// Not to be confused with listen address. pub peer_address: String, - /// Reflects the role of the node in the raft group. - /// Non-voters are also called learners in terms of raft. - pub voter: bool, - pub instance_id: String, + + /// Name of a replicaset the instance belongs to. pub replicaset_id: String, - pub instance_uuid: String, pub replicaset_uuid: String, - /// `0` means it's not committed yet. + + /// Index in the raft log. `0` means it's not committed yet. pub commit_index: u64, + /// Is this instance active. Instances become inactive when they shut down. - pub is_active: bool, + pub active: bool, } impl AsTuple for Peer {} @@ -369,7 +386,7 @@ pub trait ContextCoercion: Serialize + DeserializeOwned { #[derive(Clone, Debug, Serialize, Deserialize)] pub enum TopologyRequest { Join(JoinRequest), - Deactivate(DeactivateRequest), + SetActive(SetActiveRequest), } impl From<JoinRequest> for TopologyRequest { @@ -378,9 +395,9 @@ impl From<JoinRequest> for TopologyRequest { } } -impl From<DeactivateRequest> for TopologyRequest { - fn from(d: DeactivateRequest) -> Self { - Self::Deactivate(d) +impl From<SetActiveRequest> for TopologyRequest { + fn from(a: SetActiveRequest) -> Self { + Self::SetActive(a) } } @@ -412,17 +429,20 @@ impl AsTuple for JoinResponse {} /////////////////////////////////////////////////////////////////////////////// /// Request to deactivate the instance. #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DeactivateRequest { - pub instance_id: String, +pub struct SetActiveRequest { pub cluster_id: String, + pub instance_id: String, + pub active: bool, } -impl AsTuple for DeactivateRequest {} +impl AsTuple for SetActiveRequest {} /////////////////////////////////////////////////////////////////////////////// /// Response to a [`DeactivateRequest`] #[derive(Clone, Debug, Serialize, Deserialize)] -pub struct DeactivateResponse {} -impl AsTuple for DeactivateResponse {} +pub struct SetActiveResponse { + pub peer: Peer, +} +impl AsTuple for SetActiveResponse {} /////////////////////////////////////////////////////////////////////////////// lazy_static::lazy_static! { diff --git a/src/traft/node.rs b/src/traft/node.rs index 3210ee0280a34b693df9a2090eadc821b6b3a710..40097a2790a7a74e062837cb7a010f7540007701 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -18,8 +18,8 @@ use std::any::type_name; use std::any::Any; use std::cell::RefCell; use std::collections::HashMap; +use std::collections::HashSet; use std::convert::TryFrom; -use std::error::Error as StdError; use std::rc::Rc; use std::time::Duration; use std::time::Instant; @@ -27,8 +27,11 @@ use thiserror::Error; use crate::traft::ContextCoercion as _; use crate::traft::Peer; +use crate::traft::RaftId; use ::tarantool::util::IntoClones as _; use protobuf::Message as _; +use protobuf::ProtobufEnum as _; +use std::iter::FromIterator as _; use crate::mailbox::Mailbox; use crate::tlog; @@ -38,12 +41,12 @@ use crate::traft::LogicalClock; use crate::traft::Storage; use crate::traft::Topology; use crate::traft::TopologyRequest; -use crate::traft::{JoinRequest, JoinResponse}; +use crate::traft::{JoinRequest, JoinResponse, SetActiveRequest}; use super::OpResult; type RawNode = raft::RawNode<Storage>; -type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>; +// type TopologyMailbox = Mailbox<(TopologyRequest, Notify)>; #[derive(Clone)] struct Notify { @@ -96,6 +99,7 @@ impl Notify { Ok(*boxed) } + #[allow(unused)] fn recv<T: 'static>(self) -> Result<T, Error> { let any: Box<dyn Any> = self.recv_any()?; let boxed: Box<T> = any.downcast().map_err(|_| Error::DowncastError)?; @@ -137,7 +141,10 @@ pub struct Status { pub id: u64, /// `raft_id` of the leader instance pub leader_id: Option<u64>, + /// One of "Follower", "Candidate", "Leader", "PreCandidate" pub raft_state: String, + /// Whether instance has finished its `postjoin` + /// initialization stage pub is_ready: bool, } @@ -145,43 +152,75 @@ pub struct Status { #[derive(Debug)] pub struct Node { _main_loop: fiber::UnitJoinHandle<'static>, - _join_loop: fiber::UnitJoinHandle<'static>, + _conf_change_loop: fiber::UnitJoinHandle<'static>, main_inbox: Mailbox<NormalRequest>, - join_inbox: TopologyMailbox, + // join_inbox: TopologyMailbox, status: Rc<RefCell<Status>>, - status_cond: Rc<fiber::Cond>, + wake_up_status_observers: Rc<fiber::Cond>, + wake_up_join_loop: Rc<fiber::Cond>, } /// A request to the raft main loop. #[derive(Clone, Debug)] enum NormalRequest { /// Propose `raft::prelude::Entry` of `EntryNormal` kind. + /// /// Make a notification when it's committed. + /// Notify the caller with `Result<Box<_>>`, + /// of the associated type `traft::Op::Result`. + /// ProposeNormal { op: traft::Op, notify: Notify }, /// Propose `raft::prelude::Entry` of `EntryConfChange` kind. + /// /// Make a notification when the second EntryConfChange is /// committed (that corresponds to the leaving of a joint state). + /// Notify the caller with `Result<Box<()>>`. + /// ProposeConfChange { + conf_change: raft::ConfChangeV2, term: u64, - peers: Vec<traft::Peer>, - to_replace: Vec<(u64, traft::Peer)>, + notify: Notify, + }, + + /// This kind of requests is special: it can only be processed on + /// the leader, and implies corresponding `TopologyRequest` to a + /// cached `struct Topology`. As a result, it proposes the + /// `Op::PersistPeer` entry to the raft. + /// + /// Make a notification when the entry is committed. + /// Notify the caller with `Result<Box<Peer>>`. + /// + HandleTopologyRequest { + req: TopologyRequest, notify: Notify, }, /// Get a read barrier. In some systems it's also called the "quorum read". + /// /// Make a notification when index is read. + /// Notify the caller with `Result<Box<u64>>`, + /// holding the resulting `raft_index`. + /// ReadIndex { notify: Notify }, - /// Start a new raft term . + /// Start a new raft term. + /// /// Make a notification when request is processed. + /// Notify the caller with `Result<Box<()>>` + /// Campaign { notify: Notify }, /// Handle message from anoher raft node. + /// Step(raft::Message), /// Tick the node. + /// /// Make a notification when request is processed. + /// Notify the caller with `Result<(Box<()>)>`, with + /// the `Err` variant unreachable + /// Tick { n_times: u32, notify: Notify }, } @@ -189,9 +228,9 @@ impl Node { pub const TICK: Duration = Duration::from_millis(100); pub fn new(cfg: &raft::Config) -> Result<Self, RaftError> { - let status_cond = Rc::new(fiber::Cond::new()); + let wake_up_status_observers = Rc::new(fiber::Cond::new()); + let wake_up_join_loop = Rc::new(fiber::Cond::new()); let main_inbox = Mailbox::<NormalRequest>::new(); - let join_inbox = TopologyMailbox::new(); let raw_node = RawNode::new(cfg, Storage, &tlog::root())?; let status = Rc::new(RefCell::new(Status { id: cfg.id, @@ -202,30 +241,40 @@ impl Node { let main_loop_fn = { let status = status.clone(); - let status_cond = status_cond.clone(); + let wake_up_status_observers = wake_up_status_observers.clone(); let main_inbox = main_inbox.clone(); - move || raft_main_loop(main_inbox, status, status_cond, raw_node) + move || raft_main_loop(main_inbox, status, wake_up_status_observers, raw_node) }; - let join_loop_fn = { + let conf_change_loop_fn = { + let status = status.clone(); + let wake_up_status_observers = wake_up_status_observers.clone(); + let wake_up_join_loop = wake_up_join_loop.clone(); let main_inbox = main_inbox.clone(); - let join_inbox = join_inbox.clone(); - move || raft_join_loop(join_inbox, main_inbox) + move || { + raft_conf_change_loop( + status, + wake_up_status_observers, + wake_up_join_loop, + main_inbox, + ) + } }; let node = Node { main_inbox, - join_inbox, + // join_inbox, status, - status_cond, + wake_up_status_observers, + wake_up_join_loop, _main_loop: fiber::Builder::new() .name("raft_main_loop") .proc(main_loop_fn) .start() .unwrap(), - _join_loop: fiber::Builder::new() - .name("raft_join_loop") - .proc(join_loop_fn) + _conf_change_loop: fiber::Builder::new() + .name("raft_conf_change_loop") + .proc(conf_change_loop_fn) .start() .unwrap(), }; @@ -241,18 +290,18 @@ impl Node { pub fn mark_as_ready(&self) { self.status.borrow_mut().is_ready = true; - self.status_cond.broadcast(); + self.wake_up_status_observers.broadcast(); } pub fn wait_status(&self) { - self.status_cond.wait(); + self.wake_up_status_observers.wait(); } pub fn read_index(&self, timeout: Duration) -> Result<u64, Error> { let (rx, tx) = Notify::new().into_clones(); self.main_inbox .send(NormalRequest::ReadIndex { notify: tx }); - rx.recv_timeout(timeout) + rx.recv_timeout::<u64>(timeout) } pub fn propose<T: OpResult + Into<traft::Op>>( @@ -265,16 +314,14 @@ impl Node { op: op.into(), notify: tx, }); - rx.recv_timeout(timeout) + rx.recv_timeout::<T::Result>(timeout) } - pub fn campaign(&self) { + pub fn campaign(&self) -> Result<(), Error> { let (rx, tx) = Notify::new().into_clones(); let req = NormalRequest::Campaign { notify: tx }; self.main_inbox.send(req); - if let Err(e) = rx.recv_any() { - tlog!(Error, "{e}"); - } + rx.recv::<()>() } pub fn step(&self, msg: raft::Message) { @@ -289,7 +336,10 @@ impl Node { notify: tx, }; self.main_inbox.send(req); - rx.recv_any().ok(); + match rx.recv() { + Ok(()) => (), + Err(e) => tlog!(Warning, "{e}"), + } } pub fn timeout_now(&self) { @@ -299,11 +349,21 @@ impl Node { }) } - pub fn change_topology(&self, req: impl Into<TopologyRequest>) -> Result<traft::RaftId, Error> { + // pub fn change_topology(&self, req: impl Into<TopologyRequest>) -> Result<traft::RaftId, Error> { + // let (rx, tx) = Notify::new().into_clones(); + + // self.join_inbox.send((req.into(), tx)); + // rx.recv() + // } + + pub fn handle_topology_request(&self, req: TopologyRequest) -> Result<traft::Peer, Error> { let (rx, tx) = Notify::new().into_clones(); + let req = NormalRequest::HandleTopologyRequest { req, notify: tx }; - self.join_inbox.send((req.into(), tx)); - rx.recv() + self.main_inbox.send(req); + let peer = rx.recv::<Peer>()?; + self.wake_up_join_loop.broadcast(); + Ok(peer) } } @@ -334,13 +394,12 @@ fn handle_committed_entries( match entry.entry_type { raft::EntryType::EntryNormal => { - handle_committed_normal_entry(entry, notifications, joint_state_latch) + handle_committed_normal_entry(entry, notifications, pool, joint_state_latch) } raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { handle_committed_conf_change( entry, raw_node, - pool, joint_state_latch, config_changed, ) @@ -362,6 +421,7 @@ fn handle_committed_entries( fn handle_committed_normal_entry( entry: traft::Entry, notifications: &mut HashMap<LogicalClock, Notify>, + pool: &mut ConnectionPool, joint_state_latch: &mut Option<JointStateLatch>, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); @@ -373,6 +433,11 @@ fn handle_committed_normal_entry( } } + if let Some(traft::Op::PersistPeer { peer }) = entry.op() { + pool.connect(peer.raft_id, peer.peer_address.clone()); + // TODO Wake up conf_change fiber + } + if let Some(latch) = joint_state_latch { if entry.index == latch.index { // It was expected to be a ConfChange entry, but it's @@ -389,19 +454,9 @@ fn handle_committed_normal_entry( fn handle_committed_conf_change( entry: traft::Entry, raw_node: &mut RawNode, - pool: &mut ConnectionPool, joint_state_latch: &mut Option<JointStateLatch>, config_changed: &mut bool, ) { - for peer in entry.iter_peers() { - let peer = traft::Peer { - commit_index: entry.index, - ..peer.clone() - }; - Storage::persist_peer_by_instance_id(&peer).unwrap(); - pool.connect(peer.raft_id, peer.peer_address); - } - // Beware: this tiny difference in type names // (`V2` or not `V2`) makes a significant // difference in `entry.data` binary layout @@ -469,7 +524,7 @@ fn handle_messages(messages: Vec<raft::Message>, pool: &ConnectionPool) { fn raft_main_loop( main_inbox: Mailbox<NormalRequest>, status: Rc<RefCell<Status>>, - status_cond: Rc<fiber::Cond>, + wake_up_status_observers: Rc<fiber::Cond>, mut raw_node: RawNode, ) { let mut next_tick = Instant::now() + Node::TICK; @@ -494,6 +549,9 @@ fn raft_main_loop( let mut joint_state_latch: Option<JointStateLatch> = None; + let topology_cache = crate::cache::CachedCell::<u64, Topology>::new(); + // let mut topology: Option<(u64, Topology)> = None; + loop { // Clean up obsolete notifications notifications.retain(|_, notify: &mut Notify| !notify.is_closed()); @@ -510,10 +568,63 @@ fn raft_main_loop( notifications.insert(lc.clone(), notify); } } + NormalRequest::HandleTopologyRequest { req, notify } => { + lc.inc(); + + let status = raw_node.status(); + if status.ss.raft_state != RaftStateRole::Leader { + let e = RaftError::ConfChangeError("not a leader".into()); + notify.notify_err(e); + continue; + } + + let mut topology = + topology_cache.pop(&raw_node.raft.term).unwrap_or_else(|| { + let peers = Storage::peers().unwrap(); + Topology::from_peers(peers).with_replication_factor(2) + }); + + let peer_result = match req { + TopologyRequest::Join(JoinRequest { + instance_id, + replicaset_id, + advertise_address, + .. + }) => topology.join(instance_id, replicaset_id, advertise_address), + + TopologyRequest::SetActive(SetActiveRequest { + instance_id, + active, + .. + }) => topology.set_active(instance_id, active), + }; + + let mut peer = match peer_result { + Ok(peer) => peer, + Err(e) => { + notify.notify_err(RaftError::ConfChangeError(e)); + topology_cache.put(raw_node.raft.term, topology); + continue; + } + }; + + peer.commit_index = raw_node.raft.raft_log.last_index() + 1; + + let ctx = traft::EntryContextNormal { + op: traft::Op::PersistPeer { peer }, + lc: lc.clone(), + }; + + if let Err(e) = raw_node.propose(ctx.to_bytes(), vec![]) { + notify.notify_err(e); + } else { + notifications.insert(lc.clone(), notify); + topology_cache.put(raw_node.raft.term, topology); + } + } NormalRequest::ProposeConfChange { + conf_change, term, - peers, - to_replace, notify, } => { // In some states proposing a ConfChange is impossible. @@ -521,9 +632,13 @@ fn raft_main_loop( #[allow(clippy::never_loop)] let reason: Option<&str> = loop { - // Raft-rs allows proposing ConfChange from any node, but it may cause - // inconsistent raft_id generation. In picodata only the leader is - // permitted to step a ProposeConfChange message. + // Checking leadership is only needed for the + // correct latch management. It doesn't affect + // raft correctness. Checking the instance is a + // leader makes sure the proposed `ConfChange` + // is appended to the raft log immediately + // instead of sending `MsgPropose` over the + // network. let status = raw_node.status(); if status.ss.raft_state != RaftStateRole::Leader { break Some("not a leader"); @@ -548,49 +663,49 @@ fn raft_main_loop( continue; } - let mut changes = Vec::with_capacity(peers.len()); - let mut new_peers: Vec<Peer> = Vec::new(); - for peer in &peers { - let change_type = match peer.voter { - true => raft::ConfChangeType::AddNode, - false => raft::ConfChangeType::AddLearnerNode, - }; - changes.push(raft::ConfChangeSingle { - change_type, - node_id: peer.raft_id, - ..Default::default() - }); - new_peers.push(peer.clone()); - } - - for (old_raft_id, peer) in &to_replace { - changes.push(raft::ConfChangeSingle { - change_type: raft::ConfChangeType::RemoveNode, - node_id: *old_raft_id, - ..Default::default() - }); - let change_type = match peer.voter { - true => raft::ConfChangeType::AddNode, - false => raft::ConfChangeType::AddLearnerNode, - }; - changes.push(raft::ConfChangeSingle { - change_type, - node_id: peer.raft_id, - ..Default::default() - }); - new_peers.push(peer.clone()); - } - - let cc = raft::ConfChangeV2 { - changes: changes.into(), - transition: raft::ConfChangeTransition::Implicit, - ..Default::default() - }; - - let ctx = traft::EntryContextConfChange { peers: new_peers }.to_bytes(); - - let prev_index = raw_node.raft.raft_log.last_index(); - if let Err(e) = raw_node.propose_conf_change(ctx, cc) { + // let mut changes = Vec::with_capacity(peers.len()); + // let mut new_peers: Vec<Peer> = Vec::new(); + // for peer in &peers { + // let change_type = match peer.active { + // true => raft::ConfChangeType::AddNode, + // false => raft::ConfChangeType::AddLearnerNode, + // }; + // changes.push(raft::ConfChangeSingle { + // change_type, + // node_id: peer.raft_id, + // ..Default::default() + // }); + // new_peers.push(peer.clone()); + // } + + // for (old_raft_id, peer) in &to_replace { + // changes.push(raft::ConfChangeSingle { + // change_type: raft::ConfChangeType::RemoveNode, + // node_id: *old_raft_id, + // ..Default::default() + // }); + // let change_type = match peer.active { + // true => raft::ConfChangeType::AddNode, + // false => raft::ConfChangeType::AddLearnerNode, + // }; + // changes.push(raft::ConfChangeSingle { + // change_type, + // node_id: peer.raft_id, + // ..Default::default() + // }); + // new_peers.push(peer.clone()); + // } + + // let cc = raft::ConfChangeV2 { + // changes: changes.into(), + // transition: raft::ConfChangeTransition::Implicit, + // ..Default::default() + // }; + + // let ctx = traft::EntryContextConfChange { peers: new_peers }.to_bytes(); + + // let prev_index = raw_node.raft.raft_log.last_index(); + if let Err(e) = raw_node.propose_conf_change(vec![], conf_change) { notify.notify_err(e); } else { // oops, current instance isn't actually a leader @@ -599,7 +714,6 @@ fn raft_main_loop( // to the raft network instead of appending it to the // raft log. let last_index = raw_node.raft.raft_log.last_index(); - assert!(last_index == prev_index + 1); joint_state_latch = Some(JointStateLatch { index: last_index, @@ -695,7 +809,7 @@ fn raft_main_loop( id => Some(id), }; status.raft_state = format!("{:?}", ss.raft_state); - status_cond.broadcast(); + wake_up_status_observers.broadcast(); } if !ready.persisted_messages().is_empty() { @@ -753,53 +867,56 @@ fn raft_main_loop( } } -fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { +fn raft_conf_change_loop( + status: Rc<RefCell<Status>>, + wake_up_status_observers: Rc<fiber::Cond>, + wake_up_join_loop: Rc<fiber::Cond>, + main_inbox: Mailbox<NormalRequest>, +) { loop { - let batch = inbox.receive_all(Duration::MAX); + if status.borrow().raft_state != "Leader" { + wake_up_status_observers.wait(); + continue; + } let term = Storage::term().unwrap().unwrap_or(0); - let mut topology = match Storage::peers() { - Ok(v) => Topology::from_peers(v).with_replication_factor(2), - Err(e) => { - for (_, notify) in batch { - let e = RaftError::ConfChangeError(format!("{e}")); - notify.notify_err(e); - } + let conf_state = Storage::conf_state().unwrap(); + let voters: HashSet<RaftId> = HashSet::from_iter(conf_state.voters); + let learners: HashSet<RaftId> = HashSet::from_iter(conf_state.learners); + let everybody: HashSet<RaftId> = voters.union(&learners).cloned().collect(); + let peers: HashMap<RaftId, bool> = Storage::peers() + .unwrap() + .iter() + .map(|peer| (peer.raft_id, peer.active)) + .collect(); + let mut changes: Vec<raft::ConfChangeSingle> = Vec::new(); + + for (node_id, _active) in peers { + if everybody.contains(&node_id) { continue; } - }; - - let mut topology_results = vec![]; - for (req, notify) in &batch { - match topology.process(req) { - Ok(peer) => { - topology_results.push((notify, peer)); - } - Err(e) => { - let e = RaftError::ConfChangeError(e); - notify.notify_err(e); - } - } + changes.push(raft::ConfChangeSingle { + change_type: raft::ConfChangeType::AddLearnerNode, + node_id, + ..Default::default() + }); } - let topology_diff = topology.diff(); - let topology_to_replace = topology.to_replace(); - - let mut ids: Vec<String> = vec![]; - for peer in &topology_diff { - ids.push(peer.instance_id.clone()); - } - for (_, peer) in &topology_to_replace { - ids.push(peer.instance_id.clone()); + if changes.is_empty() { + wake_up_join_loop.wait(); + continue; } - tlog!(Info, "processing batch: {ids:?}"); + + let conf_change = raft::ConfChangeV2 { + changes: changes.into(), + ..Default::default() + }; let (rx, tx) = Notify::new().into_clones(); main_inbox.send(NormalRequest::ProposeConfChange { term, - peers: topology_diff, - to_replace: topology_to_replace, + conf_change, notify: tx, }); @@ -807,18 +924,9 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { // will sometimes be handled and there's no need in timeout. // It also guarantees that the notification will arrive only // after the node leaves the joint state. - let res = rx.recv::<u64>(); - tlog!(Info, "batch processed: {ids:?}, {res:?}"); - for (notify, peer) in topology_results { - match &res { - Ok(_) => notify.notify_ok(peer.raft_id), - Err(e) => { - // RaftError doesn't implement the Clone trait, - // so we have to be creative. - let e = RaftError::ConfChangeError(format!("{e}")); - notify.notify_err(e); - } - }; + match rx.recv() { + Ok(()) => tlog!(Debug, "conf_change processed"), + Err(e) => tlog!(Warning, "conf_change failed: {e}"), } } } @@ -844,7 +952,7 @@ pub fn global() -> Result<&'static Node, Error> { } #[proc(packed_args)] -fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> { +fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn std::error::Error>> { let node = global()?; for pb in pbs { node.step(raft::Message::try_from(pb)?); @@ -853,7 +961,7 @@ fn raft_interact(pbs: Vec<traft::MessagePb>) -> Result<(), Box<dyn StdError>> { } #[proc(packed_args)] -fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> { +fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error>> { let node = global()?; let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; @@ -865,9 +973,7 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> { })); } - let raft_id = node.change_topology(req)?; - - let peer = Storage::peer_by_raft_id(raft_id)?.ok_or("the peer has misteriously disappeared")?; + let peer = node.handle_topology_request(req.into())?; let raft_group = Storage::peers()?; let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?; diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 66ee4957b33c2d749f529c3b2fe29f0941e487ed..29a6f6238ce382a91977550640443eed017cbbac 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -72,15 +72,14 @@ impl Storage { if_not_exists = true, is_local = true, format = { + {name = 'instance_id', type = 'string', is_nullable = false}, + {name = 'instance_uuid', type = 'string', is_nullable = false}, {name = 'raft_id', type = 'unsigned', is_nullable = false}, {name = 'peer_address', type = 'string', is_nullable = false}, - {name = 'voter', type = 'boolean', is_nullable = false}, - {name = 'instance_id', type = 'string', is_nullable = false}, {name = 'replicaset_id', type = 'string', is_nullable = false}, - {name = 'instance_uuid', type = 'string', is_nullable = false}, {name = 'replicaset_uuid', type = 'string', is_nullable = false}, {name = 'commit_index', type = 'unsigned', is_nullable = false}, - {name = 'is_active', type = 'boolean', is_nullable = false}, + {name = 'active', type = 'boolean', is_nullable = false}, } }) diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 086e4883553290ee7f82a621c757c80a1cbeb955..ba065d0a2be02f09deb8044bba6a6d1267beaeee 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -3,35 +3,30 @@ use std::collections::BTreeSet; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; -use crate::traft::DeactivateRequest; -use crate::traft::JoinRequest; use crate::traft::Peer; -use crate::traft::RaftId; -use crate::traft::TopologyRequest; +use crate::traft::{ + InstanceId, + // type aliases + RaftId, + ReplicasetId, +}; use raft::INVALID_INDEX; pub struct Topology { - peers: BTreeMap<RaftId, Peer>, - diff: BTreeSet<RaftId>, - to_replace: BTreeMap<RaftId, RaftId>, replication_factor: u8, - max_raft_id: RaftId, - instance_id_map: BTreeMap<String, RaftId>, - replicaset_map: BTreeMap<String, BTreeSet<RaftId>>, + + instance_map: BTreeMap<InstanceId, Peer>, + replicaset_map: BTreeMap<ReplicasetId, BTreeSet<InstanceId>>, } impl Topology { pub fn from_peers(mut peers: Vec<Peer>) -> Self { let mut ret = Self { - peers: Default::default(), - diff: Default::default(), - to_replace: Default::default(), replication_factor: 2, - max_raft_id: 0, - instance_id_map: Default::default(), + instance_map: Default::default(), replicaset_map: Default::default(), }; @@ -48,19 +43,22 @@ impl Topology { } fn put_peer(&mut self, peer: Peer) { - self.peers.insert(peer.raft_id, peer.clone()); - self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id); - self.instance_id_map.insert(peer.instance_id, peer.raft_id); + + let instance_id = peer.instance_id.clone(); + let replicaset_id = peer.replicaset_id.clone(); + + // FIXME remove old peer + self.instance_map.insert(instance_id.clone(), peer); self.replicaset_map - .entry(peer.replicaset_id.clone()) + .entry(replicaset_id) .or_default() - .insert(peer.raft_id); + .insert(instance_id); } - fn peer_by_instance_id(&self, instance_id: &str) -> Option<Peer> { - let raft_id = self.instance_id_map.get(instance_id)?; - self.peers.get(raft_id).cloned() + fn choose_instance_id(&self, raft_id: u64) -> String { + // FIXME: what if generated instance_id is already taken? + format!("i{raft_id}") } fn choose_replicaset_id(&self) -> String { @@ -80,417 +78,352 @@ impl Topology { } } - fn choose_instance_id(instance_id: Option<String>, raft_id: u64) -> String { - match instance_id { - Some(v) => v, - None => format!("i{raft_id}"), - } - } - - pub fn join(&mut self, req: &JoinRequest) -> Result<Peer, String> { - match &req.instance_id { - Some(instance_id) => match self.peer_by_instance_id(instance_id) { - Some(peer) => self.modify_existing_instance(peer, req), - None => self.join_new_instance(req), - }, - None => self.join_new_instance(req), - } - } - - fn modify_existing_instance(&mut self, peer: Peer, req: &JoinRequest) -> Result<Peer, String> { - match &req.replicaset_id { - Some(replicaset_id) if replicaset_id != &peer.replicaset_id => { - let e = format!( - std::concat!( - "{} already joined with a different replicaset_id,", - " requested: {},", - " existing: {}.", - ), - peer.instance_id, replicaset_id, peer.replicaset_id - ); + pub fn join( + &mut self, + instance_id: Option<String>, + replicaset_id: Option<String>, + advertise: String, + ) -> Result<Peer, String> { + if let Some(id) = instance_id.as_ref() { + let existing_peer: Option<&Peer> = self.instance_map.get(id); + + if matches!(existing_peer, Some(peer) if peer.active) { + let e = format!("{} is already joined", id); return Err(e); } - _ => (), - } - - let mut peer = peer; - peer.peer_address = req.advertise_address.clone(); - peer.voter = req.voter; - peer.is_active = true; - - if req.voter { - self.diff.insert(peer.raft_id); - } else { - let old_raft_id = peer.raft_id; - peer.raft_id = self.max_raft_id + 1; - - self.to_replace.insert(peer.raft_id, old_raft_id); } - self.put_peer(peer.clone()); - Ok(peer) - } - fn join_new_instance(&mut self, req: &JoinRequest) -> Result<Peer, String> { + // Anyway, `join` always produces a new raft_id. let raft_id = self.max_raft_id + 1; - let replicaset_id = match &req.replicaset_id { - Some(v) => v.clone(), - None => self.choose_replicaset_id(), - }; + let instance_id: String = instance_id.unwrap_or_else(|| self.choose_instance_id(raft_id)); + let instance_uuid = instance_uuid(&instance_id); + let replicaset_id: String = replicaset_id.unwrap_or_else(|| self.choose_replicaset_id()); let replicaset_uuid = replicaset_uuid(&replicaset_id); - let instance_id = Self::choose_instance_id(req.instance_id.clone(), raft_id); let peer = Peer { + instance_id, + instance_uuid, raft_id, - instance_id: instance_id.clone(), + peer_address: advertise.into(), replicaset_id, - commit_index: INVALID_INDEX, - instance_uuid: instance_uuid(&instance_id), replicaset_uuid, - peer_address: req.advertise_address.clone(), - voter: req.voter, - is_active: true, + commit_index: INVALID_INDEX, + active: true, }; - self.diff.insert(raft_id); self.put_peer(peer.clone()); Ok(peer) } - pub fn deactivate(&mut self, req: &DeactivateRequest) -> Result<Peer, String> { - let peer = match self.peer_by_instance_id(&req.instance_id) { - Some(peer) => peer, - None => { - return Err(format!( - "request to deactivate unknown instance {}", - &req.instance_id - )) - } - }; - - let peer = Peer { - voter: false, - is_active: false, - ..peer - }; - - self.diff.insert(peer.raft_id); - // no need to call put_peer, as the peer was already in the cluster - self.peers.insert(peer.raft_id, peer.clone()); - - Ok(peer) - } - - pub fn process(&mut self, req: &TopologyRequest) -> Result<Peer, String> { - match req { - TopologyRequest::Join(join) => self.join(join), - TopologyRequest::Deactivate(deactivate) => self.deactivate(deactivate), - } + #[allow(unused)] + pub fn set_advertise( + &mut self, + instance_id: String, + peer_address: String, + ) -> Result<Peer, String> { + let mut peer = self + .instance_map + .get_mut(&instance_id) + .ok_or_else(|| format!("unknown instance {}", instance_id))?; + + peer.peer_address = peer_address; + Ok(peer.clone()) } - pub fn diff(&self) -> Vec<Peer> { - self.diff - .iter() - .map(|id| self.peers.get(id).expect("peers must contain all peers")) - .cloned() - .collect() - } + pub fn set_active(&mut self, instance_id: String, active: bool) -> Result<Peer, String> { + let mut peer = self + .instance_map + .get_mut(&instance_id) + .ok_or_else(|| format!("unknown instance {}", instance_id))?; - pub fn to_replace(&self) -> Vec<(RaftId, Peer)> { - self.to_replace - .iter() - .map(|(new_id, &old_id)| { - let peer = self - .peers - .get(new_id) - .expect("peers must contain all peers") - .clone(); - (old_id, peer) - }) - .collect() + peer.active = active; + Ok(peer.clone()) } } // Create first peer in the cluster pub fn initial_peer( - cluster_id: String, instance_id: Option<String>, replicaset_id: Option<String>, - advertise_address: String, + advertise: String, ) -> Peer { let mut topology = Topology::from_peers(vec![]); - let req = JoinRequest { - cluster_id, - instance_id, - replicaset_id, - advertise_address, - voter: true, - }; - topology.join(&req).unwrap(); - topology.diff().pop().unwrap() + let mut peer = topology + .join(instance_id, replicaset_id, advertise) + .unwrap(); + peer.commit_index = 1; + peer } -#[cfg(test)] -mod tests { - use super::Topology; - use crate::traft::instance_uuid; - use crate::traft::replicaset_uuid; - use crate::traft::Peer; - - macro_rules! peers { - [ $( ( - $raft_id:expr, - $instance_id:literal, - $replicaset_id:literal, - $peer_address:literal, - $voter:literal - $(, $is_active:literal)? - $(,)? - ) ),* $(,)? ] => { - vec![$( - peer!($raft_id, $instance_id, $replicaset_id, $peer_address, $voter $(, $is_active)?) - ),*] - }; - } - - macro_rules! peer { - ( - $raft_id:expr, - $instance_id:literal, - $replicaset_id:literal, - $peer_address:literal, - $voter:literal - $(, $is_active:literal)? - $(,)? - ) => {{ - let peer = Peer { - raft_id: $raft_id, - peer_address: $peer_address.into(), - voter: $voter, - instance_id: $instance_id.into(), - replicaset_id: $replicaset_id.into(), - instance_uuid: instance_uuid($instance_id), - replicaset_uuid: replicaset_uuid($replicaset_id), - commit_index: raft::INVALID_INDEX, - is_active: true, - }; - $( let peer = Peer { is_active: $is_active, ..peer }; )? - peer - }}; - } - - macro_rules! join { - ( - $instance_id:literal, - $replicaset_id:expr, - $advertise_address:literal, - $voter:literal - ) => { - &crate::traft::TopologyRequest::Join(crate::traft::JoinRequest { - cluster_id: "cluster1".into(), - instance_id: Some($instance_id.into()), - replicaset_id: $replicaset_id.map(|v: &str| v.into()), - advertise_address: $advertise_address.into(), - voter: $voter, - }) - }; - } - - macro_rules! deactivate { - ($instance_id:literal) => { - &crate::traft::TopologyRequest::Deactivate(crate::traft::DeactivateRequest { - instance_id: $instance_id.into(), - cluster_id: "cluster1".into(), - }) - }; - } - - macro_rules! test_reqs { - ( - replication_factor: $replication_factor:literal, - init: $peers:expr, - req: [ $( $req:expr ),* $(,)?], - expected_diff: $expected:expr, - $( expected_to_replace: $expected_to_replace:expr, )? - ) => { - let mut t = Topology::from_peers($peers) - .with_replication_factor($replication_factor); - $( t.process($req).unwrap(); )* - - pretty_assertions::assert_eq!(t.diff(), $expected); - $( pretty_assertions::assert_eq!(t.to_replace(), $expected_to_replace); )? - }; - } - - #[test] - fn test_simple() { - assert_eq!(Topology::from_peers(vec![]).diff(), vec![]); - - let peers = peers![(1, "i1", "R1", "addr:1", true)]; - assert_eq!(Topology::from_peers(peers).diff(), vec![]); - - test_reqs!( - replication_factor: 1, - init: peers![], - req: [ - join!("i1", None, "nowhere", true), - join!("i2", None, "nowhere", true), - ], - expected_diff: peers![ - (1, "i1", "r1", "nowhere", true), - (2, "i2", "r2", "nowhere", true), - ], - ); - - test_reqs!( - replication_factor: 1, - init: peers![ - (1, "i1", "R1", "addr:1", true), - ], - req: [ - join!("i2", Some("R2"), "addr:2", false), - ], - expected_diff: peers![ - (2, "i2", "R2", "addr:2", false), - ], - ); - } - - #[test] - fn test_override() { - test_reqs!( - replication_factor: 1, - init: peers![ - (1, "i1", "R1", "addr:1", false), - ], - req: [ - join!("i1", None, "addr:2", true), - ], - expected_diff: peers![ - (1, "i1", "R1", "addr:2", true), - ], - ); - } - - #[test] - fn test_batch_overlap() { - test_reqs!( - replication_factor: 1, - init: peers![], - req: [ - join!("i1", Some("R1"), "addr:1", false), - join!("i1", None, "addr:2", true), - ], - expected_diff: peers![ - (1, "i1", "R1", "addr:2", true), - ], - ); - } - - #[test] - fn test_replicaset_mismatch() { - let expected_error = concat!( - "i3 already joined with a different replicaset_id,", - " requested: R-B,", - " existing: R-A.", - ); - - let peers = peers![(3, "i3", "R-A", "x:1", false)]; - let mut topology = Topology::from_peers(peers); - topology - .process(join!("i3", Some("R-B"), "x:2", true)) - .map_err(|e| assert_eq!(e, expected_error)) - .unwrap_err(); - assert_eq!(topology.diff(), vec![]); - assert_eq!(topology.to_replace(), vec![]); - - let peers = peers![(2, "i2", "R-A", "nowhere", true)]; - let mut topology = Topology::from_peers(peers); - topology - .process(join!("i3", Some("R-A"), "y:1", false)) - .unwrap(); - topology - .process(join!("i3", Some("R-B"), "y:2", true)) - .map_err(|e| assert_eq!(e, expected_error)) - .unwrap_err(); - assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]); - assert_eq!(topology.to_replace(), vec![]); - } - - #[test] - fn test_replication_factor() { - test_reqs!( - replication_factor: 2, - init: peers![ - (9, "i9", "r9", "nowhere", false), - (10, "i9", "r9", "nowhere", false), - ], - req: [ - join!("i1", None, "addr:1", true), - join!("i2", None, "addr:2", false), - join!("i3", None, "addr:3", false), - join!("i4", None, "addr:4", false), - ], - expected_diff: peers![ - (11, "i1", "r1", "addr:1", true), - (12, "i2", "r1", "addr:2", false), - (13, "i3", "r2", "addr:3", false), - (14, "i4", "r2", "addr:4", false), - ], - ); - } - - #[test] - fn test_replace() { - test_reqs!( - replication_factor: 2, - init: peers![ - (1, "i1", "r1", "nowhere", false), - ], - req: [ - join!("i1", None, "addr:2", false), - ], - expected_diff: peers![], - expected_to_replace: vec![ - (1, peer!(2, "i1", "r1", "addr:2", false)), - ], - ); - } - - #[test] - fn test_deactivation() { - test_reqs!( - replication_factor: 1, - init: peers![ - (1, "deactivate", "r1", "nowhere", true, true), - (2, "activate_learner", "r2", "nowhere", false, false), - (3, "activate_voter", "r3", "nowhere", false, false), - ], - req: [ - deactivate!("deactivate"), - join!("activate_learner", None, "nowhere", false), - join!("activate_voter", None, "nowhere", true), - ], - expected_diff: peers![ - (1, "deactivate", "r1", "nowhere", false, false), - (3, "activate_voter", "r3", "nowhere", true, true), - ], - expected_to_replace: vec![ - (2, peer!(4, "activate_learner", "r2", "nowhere", false, true)), - ], - ); - - test_reqs!( - replication_factor: 1, - init: peers![], - req: [ - join!("deactivate", Some("r1"), "nowhere", true), - deactivate!("deactivate"), - deactivate!("deactivate"), - deactivate!("deactivate"), - ], - expected_diff: peers![ - (1, "deactivate", "r1", "nowhere", false, false), - ], - ); - } -} +// #[cfg(test)] +// mod tests { +// use super::Topology; +// use crate::traft::instance_uuid; +// use crate::traft::replicaset_uuid; +// use crate::traft::Peer; + +// // macro_rules! peers { +// // [ $( ( +// // $raft_id:expr, +// // $instance_id:literal, +// // $replicaset_id:literal, +// // $peer_address:literal, +// // $(, $is_active:literal)? +// // $(,)? +// // ) ),* $(,)? ] => { +// // vec![$( +// // peer!($raft_id, $instance_id, $replicaset_id, $peer_address $(, $is_active)?) +// // ),*] +// // }; +// // } + +// macro_rules! peer { +// ( +// $raft_id:expr, +// $instance_id:literal, +// $replicaset_id:literal, +// $peer_address:literal, +// $(, $active:literal)? +// $(,)? +// ) => {{ +// let peer = Peer { +// raft_id: $raft_id, +// peer_address: $peer_address, +// instance_id: $instance_id.into(), +// replicaset_id: $replicaset_id.into(), +// instance_uuid: instance_uuid($instance_id), +// replicaset_uuid: replicaset_uuid($replicaset_id), +// commit_index: raft::INVALID_INDEX, +// active: true, +// }; +// $( let peer = Peer { active: $active, ..peer }; )? +// peer +// }}; +// } + +// // macro_rules! join { +// // ( +// // $instance_id:literal, +// // $replicaset_id:expr, +// // $advertise_address:literal, +// // $voter:literal +// // ) => { +// // &crate::traft::TopologyRequest::Join(crate::traft::JoinRequest { +// // cluster_id: "cluster1".into(), +// // instance_id: Some($instance_id.into()), +// // replicaset_id: $replicaset_id.map(|v: &str| v.into()), +// // advertise_address: $advertise_address.into(), +// // voter: $voter, +// // }) +// // }; +// // } + +// // macro_rules! deactivate { +// // ($instance_id:literal) => { +// // &crate::traft::TopologyRequest::Deactivate(crate::traft::DeactivateRequest { +// // instance_id: $instance_id.into(), +// // cluster_id: "cluster1".into(), +// // }) +// // }; +// // } + +// // macro_rules! test_reqs { +// // ( +// // replication_factor: $replication_factor:literal, +// // init: $peers:expr, +// // req: [ $( $req:expr ),* $(,)?], +// // expected_diff: $expected:expr, +// // $( expected_to_replace: $expected_to_replace:expr, )? +// // ) => { +// // let mut t = Topology::from_peers($peers) +// // .with_replication_factor($replication_factor); +// // $( t.process($req).unwrap(); )* + +// // pretty_assertions::assert_eq!(t.diff(), $expected); +// // $( pretty_assertions::assert_eq!(t.to_replace(), $expected_to_replace); )? +// // }; +// // } + +// #[test] +// fn test_simple() { +// let mut topology = Topology::from_peers(vec![]); + +// assert_eq!( +// topology.join(None, None, "addr:1".into()).unwrap(), +// peer!(1, "i1", "R1", "addr:1", true) // { Peer::default() } +// ) + +// // assert_eq!( +// // topology.join(None, Some("R2"), "addr:1").unwrap(), +// // peer!(1, "i1", "R1", "addr:1", true) +// // ) + +// // let peers = peers![(1, "i1", "R1", "addr:1", true)]; +// // assert_eq!(Topology::from_peers(peers).diff(), vec![]); + +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![], +// // req: [ +// // join!("i1", None, "nowhere", true), +// // join!("i2", None, "nowhere", true), +// // ], +// // expected_diff: peers![ +// // (1, "i1", "r1", "nowhere", true), +// // (2, "i2", "r2", "nowhere", true), +// // ], +// // ); + +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![ +// // (1, "i1", "R1", "addr:1", true), +// // ], +// // req: [ +// // join!("i2", Some("R2"), "addr:2", false), +// // ], +// // expected_diff: peers![ +// // (2, "i2", "R2", "addr:2", false), +// // ], +// // ); +// } + +// // #[test] +// // fn test_override() { +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![ +// // (1, "i1", "R1", "addr:1", false), +// // ], +// // req: [ +// // join!("i1", None, "addr:2", true), +// // ], +// // expected_diff: peers![ +// // (1, "i1", "R1", "addr:2", true), +// // ], +// // ); +// // } + +// // #[test] +// // fn test_batch_overlap() { +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![], +// // req: [ +// // join!("i1", Some("R1"), "addr:1", false), +// // join!("i1", None, "addr:2", true), +// // ], +// // expected_diff: peers![ +// // (1, "i1", "R1", "addr:2", true), +// // ], +// // ); +// // } + +// // #[test] +// // fn test_replicaset_mismatch() { +// // let expected_error = concat!( +// // "i3 already joined with a different replicaset_id,", +// // " requested: R-B,", +// // " existing: R-A.", +// // ); + +// // let peers = peers![(3, "i3", "R-A", "x:1", false)]; +// // let mut topology = Topology::from_peers(peers); +// // topology +// // .process(join!("i3", Some("R-B"), "x:2", true)) +// // .map_err(|e| assert_eq!(e, expected_error)) +// // .unwrap_err(); +// // assert_eq!(topology.diff(), vec![]); +// // assert_eq!(topology.to_replace(), vec![]); + +// // let peers = peers![(2, "i2", "R-A", "nowhere", true)]; +// // let mut topology = Topology::from_peers(peers); +// // topology +// // .process(join!("i3", Some("R-A"), "y:1", false)) +// // .unwrap(); +// // topology +// // .process(join!("i3", Some("R-B"), "y:2", true)) +// // .map_err(|e| assert_eq!(e, expected_error)) +// // .unwrap_err(); +// // assert_eq!(topology.diff(), peers![(3, "i3", "R-A", "y:1", false)]); +// // assert_eq!(topology.to_replace(), vec![]); +// // } + +// // #[test] +// // fn test_replication_factor() { +// // test_reqs!( +// // replication_factor: 2, +// // init: peers![ +// // (9, "i9", "r9", "nowhere", false), +// // (10, "i9", "r9", "nowhere", false), +// // ], +// // req: [ +// // join!("i1", None, "addr:1", true), +// // join!("i2", None, "addr:2", false), +// // join!("i3", None, "addr:3", false), +// // join!("i4", None, "addr:4", false), +// // ], +// // expected_diff: peers![ +// // (11, "i1", "r1", "addr:1", true), +// // (12, "i2", "r1", "addr:2", false), +// // (13, "i3", "r2", "addr:3", false), +// // (14, "i4", "r2", "addr:4", false), +// // ], +// // ); +// // } + +// // #[test] +// // fn test_replace() { +// // test_reqs!( +// // replication_factor: 2, +// // init: peers![ +// // (1, "i1", "r1", "nowhere", false), +// // ], +// // req: [ +// // join!("i1", None, "addr:2", false), +// // ], +// // expected_diff: peers![], +// // expected_to_replace: vec![ +// // (1, peer!(2, "i1", "r1", "addr:2", false)), +// // ], +// // ); +// // } + +// // #[test] +// // fn test_deactivation() { +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![ +// // (1, "deactivate", "r1", "nowhere", true, true), +// // (2, "activate_learner", "r2", "nowhere", false, false), +// // (3, "activate_voter", "r3", "nowhere", false, false), +// // ], +// // req: [ +// // deactivate!("deactivate"), +// // join!("activate_learner", None, "nowhere", false), +// // join!("activate_voter", None, "nowhere", true), +// // ], +// // expected_diff: peers![ +// // (1, "deactivate", "r1", "nowhere", false, false), +// // (3, "activate_voter", "r3", "nowhere", true, true), +// // ], +// // expected_to_replace: vec![ +// // (2, peer!(4, "activate_learner", "r2", "nowhere", false, true)), +// // ], +// // ); + +// // test_reqs!( +// // replication_factor: 1, +// // init: peers![], +// // req: [ +// // join!("deactivate", Some("r1"), "nowhere", true), +// // deactivate!("deactivate"), +// // deactivate!("deactivate"), +// // deactivate!("deactivate"), +// // ], +// // expected_diff: peers![ +// // (1, "deactivate", "r1", "nowhere", false, false), +// // ], +// // ); +// // } +// }