diff --git a/src/governor/mod.rs b/src/governor/mod.rs index c87917c6a35949f1a376e5ccd6eae86397a6f9f1..9d0d2f266e2ae8f290c6b5ac4172f8de1f8714c2 100644 --- a/src/governor/mod.rs +++ b/src/governor/mod.rs @@ -32,6 +32,7 @@ use plan::stage::*; impl Loop { const SYNC_TIMEOUT: Duration = Duration::from_secs(10); const RETRY_TIMEOUT: Duration = Duration::from_millis(250); + const UPDATE_INSTANCE_TIMEOUT: Duration = Duration::from_secs(3); async fn iter_fn( Args { @@ -219,7 +220,7 @@ impl Loop { "current_grade" => %current_grade, ] async { - node.handle_update_instance_request_and_wait(req)? + node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? } } } @@ -301,7 +302,7 @@ impl Loop { "current_grade" => %current_grade, ] async { - node.handle_update_instance_request_and_wait(req)? + node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? } } } @@ -344,7 +345,7 @@ impl Loop { "current_grade" => %current_grade, ] async { - node.handle_update_instance_request_and_wait(req)? + node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? } } } @@ -382,7 +383,7 @@ impl Loop { "current_grade" => %current_grade, ] async { - node.handle_update_instance_request_and_wait(req)? + node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? } } } @@ -436,7 +437,7 @@ impl Loop { "current_grade" => %current_grade, ] async { - node.handle_update_instance_request_and_wait(req)? + node.handle_update_instance_request_and_wait(req, Loop::UPDATE_INSTANCE_TIMEOUT)? } } } diff --git a/src/lib.rs b/src/lib.rs index d2f0374ac3750cf8d11b59cd671e34d165edb1ca..432631f15921e8ebef2281c64930daee0a6759b6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,8 +4,7 @@ use serde::{Deserialize, Serialize}; use ::raft::prelude as raft; use ::tarantool::error::Error as TntError; use ::tarantool::fiber; -use ::tarantool::fiber::r#async::timeout; -use ::tarantool::fiber::r#async::timeout::IntoTimeout as _; +use ::tarantool::fiber::r#async::timeout::{self, IntoTimeout}; use ::tarantool::tlua; use ::tarantool::transaction::transaction; use rpc::{join, update_instance}; @@ -354,13 +353,11 @@ fn start_discover(args: &args::Run, to_supervisor: ipc::Sender<IpcMessage>) { fn start_boot(args: &args::Run) { tlog!(Info, ">>>>> start_boot()"); - let (instance, address, _) = traft::topology::initial_instance( + let instance = traft::topology::initial_instance( args.instance_id.clone(), args.replicaset_id.clone(), - args.advertise_address(), args.failure_domain(), - ) - .expect("failed adding initial instance"); + ); let raft_id = instance.raft_id; let instance_id = instance.instance_id.clone(); @@ -408,7 +405,10 @@ fn start_boot(args: &args::Run) { init_entries_push_op( op::Dml::insert( ClusterwideSpaceId::Address, - &traft::PeerAddress { raft_id, address }, + &traft::PeerAddress { + raft_id, + address: args.advertise_address(), + }, ) .expect("cannot fail") .into(), @@ -486,8 +486,8 @@ fn start_boot(args: &args::Run) { postjoin(args, storage, raft_storage) } -fn start_join(args: &args::Run, leader_address: String) { - tlog!(Info, ">>>>> start_join({leader_address})"); +fn start_join(args: &args::Run, instance_address: String) { + tlog!(Info, ">>>>> start_join({instance_address})"); let req = join::Request { cluster_id: args.cluster_id.clone(), @@ -497,33 +497,20 @@ fn start_join(args: &args::Run, leader_address: String) { failure_domain: args.failure_domain(), }; - let mut leader_address = leader_address; - // Arch memo. // - There must be no timeouts. Retrying may lead to flooding the // topology with phantom instances. No worry, specifying a // particular `instance_id` for every instance protects from that // flood. // - It's fine to retry "connection refused" errors. - // - TODO renew leader_address if the current one says it's not a - // leader. - let resp: rpc::join::OkResponse = loop { + let resp: rpc::join::Response = loop { let now = Instant::now(); // TODO: exponential delay let timeout = Duration::from_secs(1); - match fiber::block_on(rpc::network_call(&leader_address, &req)) { - Ok(join::Response::Ok(resp)) => { + match fiber::block_on(rpc::network_call(&instance_address, &req)) { + Ok(resp) => { break resp; } - Ok(join::Response::ErrNotALeader(maybe_new_leader)) => { - tlog!(Warning, "join request failed: not a leader, retry..."); - if let Some(new_leader) = maybe_new_leader { - leader_address = new_leader.address; - } else { - fiber::sleep(Duration::from_millis(100)); - } - continue; - } Err(TntError::Tcp(e)) => { tlog!(Warning, "join request failed: {e}, retry..."); fiber::sleep(timeout.saturating_sub(now.elapsed())); @@ -627,6 +614,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces tlog!(Error, "failed setting on_shutdown trigger: {e}"); } + // Activates instance loop { let instance = storage .instances @@ -635,6 +623,7 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces let cluster_id = raft_storage .cluster_id() .expect("storage should never fail"); + // Doesn't have to be leader - can be any online peer let leader_id = node.status().leader_id; let leader_address = leader_id.and_then(|id| storage.peer_addresses.try_get(id).ok()); let Some(leader_address) = leader_address else { @@ -653,31 +642,22 @@ fn postjoin(args: &args::Run, storage: Clusterwide, raft_storage: RaftSpaceAcces let req = update_instance::Request::new(instance.instance_id, cluster_id) .with_target_grade(TargetGradeVariant::Online) .with_failure_domain(args.failure_domain()); - - // It's necessary to call `proc_update_instance` remotely on a - // leader over net_box. It always fails otherwise. Only the - // leader is permitted to propose changes to _pico_instance. let now = Instant::now(); let timeout = Duration::from_secs(10); match fiber::block_on(rpc::network_call(&leader_address, &req).timeout(timeout)) { - Ok(update_instance::Response::Ok) => { + Ok(update_instance::Response {}) => { break; } - Ok(update_instance::Response::ErrNotALeader) => { - tlog!(Warning, "failed to activate myself: not a leader, retry..."); - fiber::sleep(Duration::from_millis(100)); - continue; - } Err(timeout::Error::Failed(TntError::Tcp(e))) => { - tlog!(Warning, "failed to activate myself: {e}, retry..."); + tlog!(Warning, "failed to activate myself: {e}, retrying..."); fiber::sleep(timeout.saturating_sub(now.elapsed())); continue; } Err(e) => { - tlog!(Error, "failed to activate myself: {e}"); + tlog!(Error, "failed to activate myself: {e}, shutting down..."); std::process::exit(-1); } - }; + } } } diff --git a/src/luamod.rs b/src/luamod.rs index ebf5d8204ae6df5ab49ea26f4c70fec94915ed1a..19192fd2fb6200c9b0a3a10c30a50c947c521aef 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -1051,7 +1051,8 @@ pub(crate) fn setup(args: &args::Run) { -> traft::Result<RaftIndex> { let op = op::Dml::from_lua_args(op).map_err(traft::error::Error::other)?; let predicate = cas::Predicate::from_lua_args(predicate.unwrap_or_default())?; - let (index, _) = compare_and_swap(op.into(), predicate)?; + // TODO: Add timeout to API + let (index, _) = compare_and_swap(op.into(), predicate, Duration::from_secs(3))?; Ok(index) }, ), diff --git a/src/on_shutdown.rs b/src/on_shutdown.rs index 250b0cbbf945ac94b2072bd3fec6e98918af4ba0..e854a454a3ca8c74e88d7b25b3139386048c3ece 100644 --- a/src/on_shutdown.rs +++ b/src/on_shutdown.rs @@ -8,7 +8,6 @@ use crate::rpc; use crate::storage::ClusterwideSpaceId; use crate::tlog; use crate::traft; -use crate::traft::error::Error; use crate::traft::node; use crate::unwrap_ok_or; @@ -78,44 +77,14 @@ fn go_offline() -> traft::Result<()> { .with_target_grade(TargetGradeVariant::Offline); loop { - let Some(leader_id) = node.status().leader_id else { - node.wait_status(); - continue - }; - let now = Instant::now(); let wait_before_retry = Duration::from_millis(300); - if leader_id == raft_id { - match node.handle_update_instance_request_and_wait(req.clone()) { - Err(Error::NotALeader) => { - // We've lost leadership while waiting for NodeImpl - // mutex. Retry after a small pause. - fiber::sleep(wait_before_retry.saturating_sub(now.elapsed())); - continue; - } - Err(e) => break Err(e), - Ok(_) => break Ok(()), - } - } - - let Some(leader_address) = node.storage.peer_addresses.get(leader_id)? else { - // Leader address is unknown, maybe later we'll find it out? - fiber::sleep(wait_before_retry.saturating_sub(now.elapsed())); - continue; - }; - let res = match fiber::block_on(rpc::network_call(&leader_address, &req)) { - Ok(rpc::update_instance::Response::Ok) => Ok(()), - Ok(rpc::update_instance::Response::ErrNotALeader) => Err(Error::NotALeader), - Err(e) => Err(e.into()), - }; - - match res { - Ok(()) => break Ok(()), + match node.handle_update_instance_request_and_wait(req.clone(), wait_before_retry) { + Ok(_) => break Ok(()), Err(e) => { tlog!(Warning, "failed setting target grade Offline: {e}, retrying ..."; - "raft_id" => leader_id, ); fiber::sleep(wait_before_retry.saturating_sub(now.elapsed())); continue; diff --git a/src/rpc/expel.rs b/src/rpc/expel.rs index 6266f9bff21e4b422342dbda863f640885c2540a..4e37670215a1b1a552bfae10eb08b3f87d638830 100644 --- a/src/rpc/expel.rs +++ b/src/rpc/expel.rs @@ -1,11 +1,15 @@ +use std::time::Duration; + use crate::instance::grade::TargetGradeVariant; use crate::instance::InstanceId; use crate::rpc; use crate::traft::Result; use crate::traft::{error::Error, node}; +const TIMEOUT: Duration = Duration::from_secs(10); + crate::define_rpc_request! { - fn proc_expel_on_leader(req: Request) -> Result<Response> { + fn proc_expel(req: Request) -> Result<Response> { let node = node::global()?; let raft_storage = &node.raft_storage; let cluster_id = raft_storage.cluster_id()?; @@ -17,21 +21,15 @@ crate::define_rpc_request! { }); } - let leader_id = node.status().leader_id.ok_or(Error::LeaderUnknown)?; - if node.raft_id() != leader_id { - return Err(Error::NotALeader); - } - let req = rpc::update_instance::Request::new(req.instance_id, req.cluster_id) .with_target_grade(TargetGradeVariant::Expelled); - node.handle_update_instance_request_and_wait(req)?; + node.handle_update_instance_request_and_wait(req, TIMEOUT)?; Ok(Response {}) } /// A request to expel an instance. /// - /// This request is only handled by the leader. /// Use [`redirect::Request`] for automatic redirection from any instance to /// leader. pub struct Request { diff --git a/src/rpc/join.rs b/src/rpc/join.rs index d2b697bad43ed9961d7ad67c4be6c16fb93fd906..18c959c776e78f168fcf04deb01dcf1103d84fcd 100644 --- a/src/rpc/join.rs +++ b/src/rpc/join.rs @@ -1,17 +1,12 @@ +use std::time::Duration; + use crate::failure_domain::FailureDomain; use crate::instance::{Instance, InstanceId}; use crate::replicaset::ReplicasetId; use crate::storage::ToEntryIter as _; use crate::traft::{error::Error, node, Address, PeerAddress, Result}; -#[derive(Clone, Debug, ::serde::Serialize, ::serde::Deserialize)] -pub struct OkResponse { - pub instance: Box<Instance>, - pub peer_addresses: Vec<PeerAddress>, - pub box_replication: Vec<Address>, - // Other parameters necessary for box.cfg() - // TODO -} +const TIMEOUT: Duration = Duration::from_secs(10); crate::define_rpc_request! { fn proc_raft_join(req: Request) -> Result<Response> { @@ -25,30 +20,15 @@ crate::define_rpc_request! { }); } - match node.handle_join_request_and_wait(req) { - Ok((instance, replication_addresses)) => { - // A joined instance needs to communicate with other nodes. - // TODO: limit the number of entries sent to reduce response size. - let peer_addresses = node.storage.peer_addresses.iter()?.collect(); + let (instance, replication_addresses) = node.handle_join_request_and_wait(req, TIMEOUT)?; + // A joined instance needs to communicate with other nodes. + // TODO: limit the number of entries sent to reduce response size. + let peer_addresses = node.storage.peer_addresses.iter()?.collect(); - Ok(Response::Ok(OkResponse { - instance, - peer_addresses, - box_replication: replication_addresses.into_iter().collect(), - })) - - }, - Err(Error::NotALeader) => { - let leader_id = node.status().leader_id; - let leader_address = leader_id.and_then(|id| node.storage.peer_addresses.try_get(id).ok()); - let leader = match (leader_id, leader_address) { - (Some(raft_id), Some(address)) => Some(PeerAddress{raft_id, address}), - (_, _) => None - }; - Ok(Response::ErrNotALeader(leader)) - } - Err(e) => Err(e), - } + Ok(Response { + instance, + peer_addresses, + box_replication: replication_addresses.into_iter().collect()}) } /// Request to join the cluster. @@ -60,9 +40,11 @@ crate::define_rpc_request! { pub failure_domain: FailureDomain, } - /// Response to a [`join::Request`](Request). - pub enum Response { - Ok(OkResponse), - ErrNotALeader(Option<PeerAddress>), + pub struct Response { + pub instance: Box<Instance>, + pub peer_addresses: Vec<PeerAddress>, + pub box_replication: Vec<Address>, + // Other parameters necessary for box.cfg() + // TODO } } diff --git a/src/rpc/update_instance.rs b/src/rpc/update_instance.rs index 449c9ddc442d44e432d5a64cc5cffb4eb3e74267..e06aa338f1f04e33d3b793961095070f9c12da9a 100644 --- a/src/rpc/update_instance.rs +++ b/src/rpc/update_instance.rs @@ -1,3 +1,5 @@ +use std::time::Duration; + use crate::failure_domain::FailureDomain; use crate::instance::grade::{CurrentGrade, TargetGradeVariant}; use crate::instance::InstanceId; @@ -5,6 +7,8 @@ use crate::tlog; use crate::traft::Result; use crate::traft::{error::Error, node}; +const TIMEOUT: Duration = Duration::from_secs(10); + crate::define_rpc_request! { fn proc_update_instance(req: Request) -> Result<Response> { let node = node::global()?; @@ -20,16 +24,13 @@ crate::define_rpc_request! { let mut req = req; let instance_id = &*req.instance_id; if let Some(current_grade) = req.current_grade.take() { - tlog!(Warning, "attempt to change current_grade by instance"; + tlog!(Warning, "attempt to change current_grade for instance"; "instance_id" => instance_id, "current_grade" => %current_grade, ); } - match node.handle_update_instance_request_and_wait(req) { - Ok(_) => Ok(Response::Ok {}), - Err(Error::NotALeader) => Ok(Response::ErrNotALeader), - Err(e) => Err(e), - } + node.handle_update_instance_request_and_wait(req, TIMEOUT)?; + Ok(Response {}) } /// Request to update the instance in the storage. @@ -44,11 +45,7 @@ crate::define_rpc_request! { pub failure_domain: Option<FailureDomain>, } - /// Response to a [`Request`] - pub enum Response { - Ok, - ErrNotALeader, - } + pub struct Response {} } impl Request { diff --git a/src/schema.rs b/src/schema.rs index 11c07fda52977f3c58f13b4db47b29d6c94e12da..27506768fa3c67c3c3ee2e9e99a0866371e74110 100644 --- a/src/schema.rs +++ b/src/schema.rs @@ -626,9 +626,9 @@ pub fn prepare_schema_change(op: Op, timeout: Duration) -> traft::Result<RaftInd ranges: vec![ cas::Range::new(ClusterwideSpaceId::Property) .eq((PropertyName::PendingSchemaChange,)), - cas::Range::new(ClusterwideSpaceId::Property as _) + cas::Range::new(ClusterwideSpaceId::Property) .eq((PropertyName::PendingSchemaVersion,)), - cas::Range::new(ClusterwideSpaceId::Property as _) + cas::Range::new(ClusterwideSpaceId::Property) .eq((PropertyName::GlobalSchemaVersion,)), cas::Range::new(ClusterwideSpaceId::Property) .eq((PropertyName::NextSchemaVersion,)), diff --git a/src/storage.rs b/src/storage.rs index 26491364837d38bd6f33deb956f4dd9197dc9ceb..5a78e31f56e06a092b97ca7d993e43b3d5908d60 100644 --- a/src/storage.rs +++ b/src/storage.rs @@ -1135,6 +1135,14 @@ impl PeerAddresses { self.get(raft_id)? .ok_or(Error::AddressUnknownForRaftId(raft_id)) } + + #[inline] + pub fn addresses_by_ids( + &self, + ids: impl IntoIterator<Item = RaftId>, + ) -> Result<HashSet<traft::Address>> { + ids.into_iter().map(|id| self.try_get(id)).collect() + } } impl ToEntryIter for PeerAddresses { @@ -1202,7 +1210,7 @@ impl Instances { Ok(()) } - /// Find a instance by `id` (see trait [`InstanceId`]). + /// Finds an instance by `id` (see trait [`InstanceId`]). #[inline(always)] pub fn get(&self, id: &impl InstanceId) -> Result<Instance> { let res = id @@ -1212,7 +1220,17 @@ impl Instances { Ok(res) } - /// Find a instance by `id` (see `InstanceId`) and return a single field + /// Checks if an instance with `id` (see trait [`InstanceId`]) is present. + #[inline] + pub fn contains(&self, id: &impl InstanceId) -> Result<bool> { + match id.find_in(self) { + Ok(_) => Ok(true), + Err(Error::NoInstanceWithInstanceId(_)) => Ok(false), + Err(err) => Err(err), + } + } + + /// Finds an instance by `id` (see `InstanceId`) and return a single field /// specified by `F` (see `InstanceFieldDef` & `instance_field` module). #[inline(always)] pub fn field<F>(&self, id: &impl InstanceId) -> Result<F::Type> @@ -1224,7 +1242,7 @@ impl Instances { Ok(res) } - /// Return an iterator over all instances. Items of the iterator are + /// Returns an iterator over all instances. Items of the iterator are /// specified by `F` (see `InstanceFieldDef` & `instance_field` module). #[inline(always)] pub fn instances_fields<F>(&self) -> Result<InstancesFields<F>> diff --git a/src/traft/node.rs b/src/traft/node.rs index ab3e8f366380b281d0b5c286fcca188dbb001d10..43a6ae95dd771f6b642555dbcb63f0908f483fa2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -5,6 +5,7 @@ //! - handling configuration changes, //! - processing raft `Ready` - persisting entries, communicating with other raft nodes. +use crate::cas; use crate::governor; use crate::has_grades; use crate::instance::Instance; @@ -16,7 +17,6 @@ use crate::schema::{Distribution, IndexDef, SpaceDef}; use crate::storage::acl; use crate::storage::ddl_meta_drop_space; use crate::storage::SnapshotData; -use crate::storage::ToEntryIter as _; use crate::storage::{ddl_abort_on_master, ddl_meta_space_update_operable}; use crate::storage::{local_schema_version, set_local_schema_version}; use crate::storage::{Clusterwide, ClusterwideSpaceId, PropertyName}; @@ -38,7 +38,6 @@ use crate::traft::RaftIndex; use crate::traft::RaftSpaceAccess; use crate::traft::RaftTerm; use crate::traft::Topology; -use crate::unwrap_some_or; use crate::util::instant_saturating_add; use crate::util::AnyWithTypeName; use crate::warn_or_panic; @@ -64,13 +63,13 @@ use ::tarantool::tuple::Decode; use ::tarantool::vclock::Vclock; use protobuf::Message as _; use std::cell::Cell; +use std::cell::RefCell; use std::cmp::Ordering; use std::collections::{HashMap, HashSet}; use std::convert::TryFrom; use std::rc::Rc; use std::time::Duration; use std::time::Instant; -use tarantool::tuple::Tuple; use ApplyEntryResult::*; type RawNode = raft::RawNode<RaftSpaceAccess>; @@ -145,6 +144,7 @@ pub struct Node { pub(crate) governor_loop: governor::Loop, status: watch::Receiver<Status>, watchers: Rc<Mutex<StorageWatchers>>, + topology: Rc<RefCell<Topology>>, } impl std::fmt::Debug for Node { @@ -159,7 +159,8 @@ impl Node { /// Initialize the raft node. /// **This function yields** pub fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> { - let node_impl = NodeImpl::new(storage.clone(), raft_storage.clone())?; + let topology = Rc::new(RefCell::new(Topology::from(storage.clone()))); + let node_impl = NodeImpl::new(storage.clone(), raft_storage.clone(), topology.clone())?; let raft_id = node_impl.raft_id(); let status = node_impl.status.subscribe(); @@ -180,6 +181,7 @@ impl Node { raft_storage, status, watchers, + topology, }; // Wait for the node to enter the main loop @@ -320,28 +322,94 @@ impl Node { /// entries to the raft log (if successful). /// /// Returns the resulting [`Instance`] when the entry is committed. - /// - /// Returns an error if the callee node isn't a raft leader. - /// - /// **This function yields** + // TODO: to make this function async and have an outer timeout, + // wait_* fns also need to be async. pub fn handle_join_request_and_wait( &self, req: rpc::join::Request, + timeout: Duration, ) -> traft::Result<(Box<Instance>, HashSet<Address>)> { - let (notify_addr, notify_instance, replication_addresses) = - self.raw_operation(|node_impl| node_impl.process_join_request_async(req))?; - fiber::block_on(async { - let (addr, instance): ( - _, - Result<Result<Option<Tuple>, ::tarantool::error::Error>, _>, - ) = futures::join!(notify_addr.recv_any(), notify_instance.recv()); - addr?; - let instance = instance?? - .expect("option is always some") - .decode() - .expect("decoding should not fail"); - Ok((Box::new(instance), replication_addresses)) - }) + let deadline = instant_saturating_add(Instant::now(), timeout); + + loop { + let instance = self + .topology + .borrow() + .build_instance( + req.instance_id.as_ref(), + req.replicaset_id.as_ref(), + &req.failure_domain, + ) + .map_err(RaftError::ConfChangeError)?; + let mut replication_addresses = self.storage.peer_addresses.addresses_by_ids( + self.topology + .borrow() + .get_replication_ids(&instance.replicaset_id), + )?; + replication_addresses.insert(req.advertise_address.clone()); + let peer_address = traft::PeerAddress { + raft_id: instance.raft_id, + address: req.advertise_address.clone(), + }; + let op_addr = Dml::replace(ClusterwideSpaceId::Address, &peer_address) + .expect("encoding should not fail"); + let op_instance = Dml::replace(ClusterwideSpaceId::Instance, &instance) + .expect("encoding should not fail"); + let ranges = vec![ + cas::Range::new(ClusterwideSpaceId::Instance), + cas::Range::new(ClusterwideSpaceId::Address), + cas::Range::new(ClusterwideSpaceId::Property) + .eq((PropertyName::ReplicationFactor,)), + ]; + macro_rules! handle_result { + ($res:expr) => { + match $res { + Ok((index, term)) => { + self.wait_index( + index, + deadline.saturating_duration_since(Instant::now()), + )?; + if term != raft::Storage::term(&self.raft_storage, index)? { + // leader switched - retry + self.wait_status(); + continue; + } + } + Err(err) => { + if err.is_cas_err() | err.is_term_mismatch_err() { + // cas error - retry + fiber::sleep(Duration::from_millis(500)); + continue; + } else { + return Err(err); + } + } + } + }; + } + // Only in this order - so that when instance exists - address will always be there. + handle_result!(cas::compare_and_swap( + Op::Dml(op_addr), + cas::Predicate { + index: self.raft_storage.applied()?, + term: self.raft_storage.term()?, + ranges: ranges.clone(), + }, + deadline.saturating_duration_since(Instant::now()), + )); + handle_result!(cas::compare_and_swap( + Op::Dml(op_instance), + cas::Predicate { + index: self.raft_storage.applied()?, + term: self.raft_storage.term()?, + ranges, + }, + deadline.saturating_duration_since(Instant::now()), + )); + + self.main_loop.wakeup(); + return Ok((instance.into(), replication_addresses)); + } } /// Processes the [`rpc::update_instance::Request`] and appends @@ -349,17 +417,61 @@ impl Node { /// /// Returns `Ok(())` when the entry is committed. /// - /// Returns an error if the callee node isn't a raft leader. - /// /// **This function yields** + // TODO: for this function to be async and have an outer timeout wait_* fns need to be async pub fn handle_update_instance_request_and_wait( &self, req: rpc::update_instance::Request, + timeout: Duration, ) -> traft::Result<()> { - let notify = - self.raw_operation(|node_impl| node_impl.process_update_instance_request_async(req))?; - fiber::block_on(notify.recv_any())?; - Ok(()) + let deadline = instant_saturating_add(Instant::now(), timeout); + + loop { + let instance = self + .topology + .borrow() + .build_updated_instance(&req) + .map_err(RaftError::ConfChangeError)?; + let dml = Dml::replace(ClusterwideSpaceId::Instance, &instance) + .expect("encoding should not fail"); + + let ranges = vec![ + cas::Range::new(ClusterwideSpaceId::Instance), + cas::Range::new(ClusterwideSpaceId::Address), + cas::Range::new(ClusterwideSpaceId::Property) + .eq((PropertyName::ReplicationFactor,)), + ]; + let res = cas::compare_and_swap( + Op::Dml(dml), + cas::Predicate { + index: self.raft_storage.applied()?, + term: self.raft_storage.term()?, + ranges, + }, + deadline.saturating_duration_since(Instant::now()), + ); + match res { + Ok((index, term)) => { + self.wait_index(index, deadline.saturating_duration_since(Instant::now()))?; + if term != raft::Storage::term(&self.raft_storage, index)? { + // leader switched - retry + self.wait_status(); + continue; + } + } + Err(err) => { + if err.is_cas_err() | err.is_term_mismatch_err() { + // cas error - retry + fiber::sleep(Duration::from_millis(500)); + continue; + } else { + return Err(err); + } + } + } + self.main_loop.wakeup(); + return Ok(()); + } } /// Only the conf_change_loop on a leader is eligible to call this function. @@ -423,7 +535,7 @@ impl Node { pub(crate) struct NodeImpl { pub raw_node: RawNode, pub notifications: HashMap<LogicalClock, Notifier>, - topology_cache: KVCell<RaftTerm, Topology>, + topology: Rc<RefCell<Topology>>, joint_state_latch: KVCell<RaftIndex, oneshot::Sender<Result<(), RaftError>>>, storage: Clusterwide, raft_storage: RaftSpaceAccess, @@ -433,7 +545,11 @@ pub(crate) struct NodeImpl { } impl NodeImpl { - fn new(storage: Clusterwide, raft_storage: RaftSpaceAccess) -> Result<Self, RaftError> { + fn new( + storage: Clusterwide, + raft_storage: RaftSpaceAccess, + topology: Rc<RefCell<Topology>>, + ) -> Result<Self, RaftError> { let box_err = |e| StorageError::Other(Box::new(e)); let raft_id: RaftId = raft_storage @@ -471,7 +587,7 @@ impl NodeImpl { Ok(Self { raw_node, notifications: Default::default(), - topology_cache: KVCell::new(), + topology, joint_state_latch: KVCell::new(), storage, raft_storage, @@ -485,39 +601,6 @@ impl NodeImpl { self.raw_node.raft.id } - /// Provides mutable access to the Topology struct which reflects - /// uncommitted state of the cluster. Ensures the node is a leader. - /// In case it's not — returns an error. - /// - /// It's important to access topology through this function so that - /// new changes are consistent with uncommitted ones. - fn topology_mut(&mut self) -> Result<&mut Topology, Error> { - if self.raw_node.raft.state != RaftStateRole::Leader { - self.topology_cache.take(); // invalidate the cache - return Err(Error::NotALeader); - } - - let current_term = self.raw_node.raft.term; - - let topology: Topology = unwrap_some_or! { - self.topology_cache.take_or_drop(¤t_term), - { - let mut instances = vec![]; - for instance @ Instance { raft_id, .. } in self.storage.instances.iter()? { - instances.push((instance, self.storage.peer_addresses.try_get(raft_id)?)) - } - let replication_factor = self - .storage - .properties - .get(PropertyName::ReplicationFactor)? - .ok_or_else(|| Error::other("missing replication_factor value in storage"))?; - Topology::new(instances).with_replication_factor(replication_factor) - } - }; - - Ok(self.topology_cache.insert(current_term, topology)) - } - pub fn read_index_async(&mut self) -> Result<Notify, RaftError> { // In some states `raft-rs` ignores the ReadIndex request. // Check it preliminary, don't wait for the timeout. @@ -588,89 +671,6 @@ impl NodeImpl { } } - /// Processes the [`rpc::join::Request`] and appends necessary - /// entries to the raft log (if successful). - /// - /// Returns an error if the callee node isn't a Raft leader. - /// - /// **This function doesn't yield** - pub fn process_join_request_async( - &mut self, - req: rpc::join::Request, - ) -> traft::Result<(Notify, Notify, HashSet<Address>)> { - let topology = self.topology_mut()?; - let (instance, address, replication_addresses) = topology - .join( - req.instance_id, - req.replicaset_id, - req.advertise_address, - req.failure_domain, - ) - .map_err(RaftError::ConfChangeError)?; - let peer_address = traft::PeerAddress { - raft_id: instance.raft_id, - address, - }; - let op_addr = Dml::replace(ClusterwideSpaceId::Address, &peer_address) - .expect("encoding should not fail"); - let op_instance = Dml::replace(ClusterwideSpaceId::Instance, &instance) - .expect("encoding should not fail"); - // Important! Calling `raw_node.propose()` may result in - // `ProposalDropped` error, but the topology has already been - // modified. The correct handling of this case should be the - // following. - // - // The `topology_cache` should be preserved. It won't be fully - // consistent anymore, but that's bearable. (TODO: examine how - // the particular requests are handled). At least it doesn't - // much differ from the case of overriding the entry due to a - // re-election. - // - // On the other hand, dropping topology_cache may be much more - // harmful. Loss of the uncommitted entries could result in - // assigning the same `raft_id` to a two different nodes. - Ok(( - self.propose_async(op_addr)?, - self.propose_async(op_instance)?, - replication_addresses, - )) - } - - /// Processes the [`rpc::update_instance::Request`] and appends - /// a corresponding [`Op::Dml`] entry to the raft log (if successful). - /// - /// Returns an error if the callee node isn't a Raft leader. - /// - /// **This function doesn't yield** - pub fn process_update_instance_request_async( - &mut self, - req: rpc::update_instance::Request, - ) -> traft::Result<Notify> { - let topology = self.topology_mut()?; - let instance = topology - .update_instance(req) - .map_err(RaftError::ConfChangeError)?; - // Important! Calling `raw_node.propose()` may result in - // `ProposalDropped` error, but the topology has already been - // modified. The correct handling of this case should be the - // following. - // - // The `topology_cache` should be preserved. It won't be fully - // consistent anymore, but that's bearable. (TODO: examine how - // the particular requests are handled). At least it doesn't - // much differ from the case of overriding the entry due to a - // re-election. - // - // On the other hand, dropping topology_cache may be much more - // harmful. Loss of the uncommitted entries could result in - // assigning the same `raft_id` to a two different nodes. - // - Ok(self.propose_async( - Dml::replace(ClusterwideSpaceId::Instance, &instance) - .expect("encoding should not fail"), - )?) - } - fn propose_conf_change_async( &mut self, term: RaftTerm, @@ -806,6 +806,8 @@ impl NodeImpl { let op = entry.into_op().unwrap_or(Op::Nop); tlog!(Debug, "applying entry: {op}"; "index" => index); + let mut instance_update = None; + let mut old_instance = None; match &op { Op::Dml(op) => { let space = op.space(); @@ -830,6 +832,20 @@ impl NodeImpl { // cannot exit during a transaction *expelled = true; } + if self + .storage + .instances + .contains(&instance.instance_id) + .expect("storage should not fail") + { + old_instance = Some( + self.storage + .instances + .get(&instance.instance_id) + .expect("storage should not fail"), + ); + } + instance_update = Some(instance); } } storage_changes.insert(space); @@ -1053,6 +1069,13 @@ impl NodeImpl { } } + // Keep topology in sync with storage + if let Some(instance_update) = instance_update { + self.topology + .borrow_mut() + .update(instance_update, old_instance) + } + if let Some(lc) = &lc { if let Some(notify) = self.notifications.remove(lc) { notify.notify_ok_any(result); @@ -1662,6 +1685,7 @@ impl MainLoop { return FlowControl::Break; } + // FIXME: potential deadlock - can't use sync mutex in async fn let mut node_impl = args.node_impl.lock(); // yields if state.stop_flag.take() { return FlowControl::Break; diff --git a/src/traft/topology.rs b/src/traft/topology.rs index f70a8719054dfce0920aa2667314a9614aa29f18..70e436f57fc6106eb552b92865ea1a9f653c6b03 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -1,4 +1,4 @@ -use std::collections::{BTreeMap, HashMap, HashSet}; +use std::collections::{BTreeMap, HashSet}; use crate::failure_domain::FailureDomain; use crate::has_grades; @@ -8,67 +8,54 @@ use crate::instance::grade::{ use crate::instance::{Instance, InstanceId}; use crate::replicaset::ReplicasetId; use crate::rpc; +use crate::storage::Clusterwide; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; -use crate::traft::Address; use crate::traft::RaftId; use crate::util::Uppercase; +/// A shallow wrapper around the [Storage](Clusterwide), +/// providing topology related calculations. +/// +/// Should be mutated only when storage is mutated. +/// +/// With this in mind it should be accessible on any peer - not only leader. #[derive(Debug)] pub struct Topology { - replication_factor: u8, max_raft_id: RaftId, - failure_domain_names: HashSet<Uppercase>, - instance_map: HashMap<InstanceId, (Instance, Address)>, - replicaset_map: BTreeMap<ReplicasetId, HashSet<InstanceId>>, + replicasets: BTreeMap<ReplicasetId, HashSet<InstanceId>>, + storage: Clusterwide, } impl Topology { - #[inline(always)] - pub fn new(instances: impl IntoIterator<Item = (Instance, Address)>) -> Self { - let mut ret = Self { - replication_factor: 1, - max_raft_id: 0, - failure_domain_names: Default::default(), - instance_map: Default::default(), - replicaset_map: Default::default(), - }; - - for (instance, address) in instances { - ret.put_instance(instance, address); - } - - ret - } - - pub fn with_replication_factor(mut self, replication_factor: u8) -> Self { - self.replication_factor = replication_factor; - self - } - - fn put_instance(&mut self, instance: Instance, address: Address) { + pub fn update(&mut self, instance: Instance, old_instance: Option<Instance>) { self.max_raft_id = std::cmp::max(self.max_raft_id, instance.raft_id); let instance_id = instance.instance_id.clone(); let replicaset_id = instance.replicaset_id.clone(); - if let Some((old_instance, ..)) = self.instance_map.remove(&instance_id) { - self.replicaset_map + if let Some(old_instance) = old_instance { + self.replicasets .get_mut(&old_instance.replicaset_id) .map(|r| r.remove(&old_instance.instance_id)); } self.failure_domain_names .extend(instance.failure_domain.names().cloned()); - self.instance_map - .insert(instance_id.clone(), (instance, address)); - self.replicaset_map + self.replicasets .entry(replicaset_id) .or_default() .insert(instance_id); } + fn replication_factor(&self) -> usize { + self.storage + .properties + .replication_factor() + .expect("storage should not fail") + } + fn choose_instance_id(&self, raft_id: RaftId) -> InstanceId { let mut suffix: Option<u64> = None; loop { @@ -78,7 +65,12 @@ impl Topology { } .into(); - if !self.instance_map.contains_key(&ret) { + if !self + .storage + .instances + .contains(&ret) + .expect("storage should not fail") + { return ret; } @@ -87,10 +79,10 @@ impl Topology { } fn choose_replicaset_id(&self, failure_domain: &FailureDomain) -> ReplicasetId { - 'next_replicaset: for (replicaset_id, instances) in self.replicaset_map.iter() { - if instances.len() < self.replication_factor as usize { + 'next_replicaset: for (replicaset_id, instances) in self.replicasets.iter() { + if instances.len() < self.replication_factor() { for instance_id in instances { - let (instance, ..) = self.instance_map.get(instance_id).unwrap(); + let instance = self.storage.instances.get(instance_id).unwrap(); if instance.failure_domain.intersects(failure_domain) { continue 'next_replicaset; } @@ -103,7 +95,7 @@ impl Topology { loop { i += 1; let replicaset_id = ReplicasetId(format!("r{i}")); - if self.replicaset_map.get(&replicaset_id).is_none() { + if self.replicasets.get(&replicaset_id).is_none() { return replicaset_id; } } @@ -125,64 +117,74 @@ impl Topology { Err(format!("missing failure domain names: {}", res.join(", "))) } - pub fn join( - &mut self, - instance_id: Option<InstanceId>, - replicaset_id: Option<ReplicasetId>, - advertise: Address, - failure_domain: FailureDomain, - ) -> Result<(Instance, Address, HashSet<Address>), String> { - if let Some(id) = &instance_id { - let existing_instance = self.instance_map.get(id); - if matches!(existing_instance, Some((instance, ..)) if has_grades!(instance, Online -> *)) - { + pub fn build_instance( + &self, + instance_id: Option<&InstanceId>, + replicaset_id: Option<&ReplicasetId>, + failure_domain: &FailureDomain, + ) -> Result<Instance, String> { + if let Some(id) = instance_id { + let existing_instance = self.storage.instances.get(id); + if matches!(existing_instance, Ok(instance) if has_grades!(instance, Online -> *)) { let e = format!("{} is already joined", id); return Err(e); } } - self.check_required_failure_domain(&failure_domain)?; + self.check_required_failure_domain(failure_domain)?; // Anyway, `join` always produces a new raft_id. let raft_id = self.max_raft_id + 1; - let instance_id = instance_id.unwrap_or_else(|| self.choose_instance_id(raft_id)); + let instance_id = instance_id + .map(Clone::clone) + .unwrap_or_else(|| self.choose_instance_id(raft_id)); let instance_uuid = instance_uuid(&instance_id); - let replicaset_id = - replicaset_id.unwrap_or_else(|| self.choose_replicaset_id(&failure_domain)); + let replicaset_id = replicaset_id + .map(Clone::clone) + .unwrap_or_else(|| self.choose_replicaset_id(failure_domain)); let replicaset_uuid = replicaset_uuid(&replicaset_id); let instance = Instance { instance_id, instance_uuid, raft_id, - replicaset_id: replicaset_id.clone(), + replicaset_id, replicaset_uuid, current_grade: CurrentGrade::offline(0), target_grade: TargetGrade::offline(0), - failure_domain, + failure_domain: failure_domain.clone(), }; - self.put_instance(instance.clone(), advertise.clone()); - - let replication_ids = self.replicaset_map.get(&replicaset_id).unwrap(); - let replication_addresses = replication_ids - .iter() - .map(|id| self.instance_map.get(id).unwrap().1.clone()) - .collect(); + Ok(instance) + } - Ok((instance, advertise, replication_addresses)) + pub fn get_replication_ids(&self, replicaset_id: &ReplicasetId) -> HashSet<RaftId> { + if let Some(replication_ids) = self.replicasets.get(replicaset_id) { + replication_ids + .iter() + .map(|id| { + let instance = self + .storage + .instances + .get(id) + .expect("storage should not fail"); + instance.raft_id + }) + .collect() + } else { + HashSet::new() + } } - pub fn update_instance( - &mut self, - req: rpc::update_instance::Request, + pub fn build_updated_instance( + &self, + req: &rpc::update_instance::Request, ) -> Result<Instance, String> { - let this = self as *const Self; - - let (instance, ..) = self - .instance_map - .get_mut(&req.instance_id) - .ok_or_else(|| format!("unknown instance {}", req.instance_id))?; + let mut instance = self + .storage + .instances + .get(&req.instance_id) + .map_err(|err| err.to_string())?; if instance.current_grade == CurrentGradeVariant::Expelled && !matches!( @@ -192,7 +194,7 @@ impl Topology { current_grade: Some(current_grade), failure_domain: None, .. - } if current_grade == CurrentGradeVariant::Expelled + } if *current_grade == CurrentGradeVariant::Expelled ) { return Err(format!( @@ -201,12 +203,9 @@ impl Topology { )); } - if let Some(fd) = req.failure_domain { - // SAFETY: this is safe, because rust doesn't complain if you inline - // the function - unsafe { &*this }.check_required_failure_domain(&fd)?; - self.failure_domain_names.extend(fd.names().cloned()); - instance.failure_domain = fd; + if let Some(fd) = req.failure_domain.as_ref() { + self.check_required_failure_domain(fd)?; + instance.failure_domain = fd.clone(); } if let Some(value) = req.current_grade { @@ -224,7 +223,26 @@ impl Topology { }; } - Ok(instance.clone()) + Ok(instance) + } +} + +impl From<Clusterwide> for Topology { + fn from(storage: Clusterwide) -> Self { + let instances = storage + .instances + .all_instances() + .expect("storage should not fail"); + let mut topology = Self { + max_raft_id: 0, + failure_domain_names: Default::default(), + replicasets: Default::default(), + storage, + }; + for instance in instances { + topology.update(instance, None); + } + topology } } @@ -232,15 +250,25 @@ impl Topology { pub fn initial_instance( instance_id: Option<InstanceId>, replicaset_id: Option<ReplicasetId>, - advertise: Address, failure_domain: FailureDomain, -) -> Result<(Instance, Address, HashSet<Address>), String> { - let mut topology = Topology::new(vec![]); - topology.join(instance_id, replicaset_id, advertise, failure_domain) +) -> Instance { + let instance_id = instance_id.unwrap_or_else(|| "i1".into()); + let replicaset_id = replicaset_id.unwrap_or_else(|| ReplicasetId::from("r1")); + let instance_uuid = instance_uuid(&instance_id); + let replicaset_uuid = replicaset_uuid(&replicaset_id); + Instance { + instance_id, + raft_id: 1, + replicaset_id, + current_grade: CurrentGrade::offline(0), + target_grade: TargetGrade::offline(0), + failure_domain, + instance_uuid, + replicaset_uuid, + } } #[rustfmt::skip] -#[cfg(test)] mod tests { use std::collections::HashSet; @@ -248,11 +276,12 @@ mod tests { use crate::failure_domain::FailureDomain; use crate::instance::grade::{CurrentGrade, Grade, TargetGrade, TargetGradeVariant}; + use crate::replicaset::ReplicasetId; + use crate::storage::Clusterwide; use crate::traft::instance_uuid; use crate::traft::replicaset_uuid; use crate::traft::Instance; use crate::rpc; - use pretty_assertions::assert_eq; trait IntoGrade<T> { fn into_grade(self) -> Grade<T>; @@ -288,7 +317,7 @@ mod tests { macro_rules! instances { [ $( ( $($instance:tt)+ ) ),* $(,)? ] => { - vec![$( (instance!($($instance)+), "who-cares.biz".into()) ),*] + vec![$( instance!($($instance)+) ),*] }; } @@ -316,34 +345,27 @@ mod tests { $( let _f = $failure_domain; )? _f }, - .. Instance::default() } }; } - macro_rules! addresses { - [$($address:literal),*] => [HashSet::from([$($address.to_string()),*])] - } - - macro_rules! join { + macro_rules! build_instance { ( $topology:expr, $instance_id:expr, - $replicaset_id:expr, - $advertise_address:literal + $replicaset_id:expr $(, $failure_domain:expr )? $(,)? ) => { - $topology.join( - $instance_id.map(<&str>::into), - $replicaset_id.map(<&str>::into), - $advertise_address.into(), - { - let _f = FailureDomain::default(); - $(let _f = $failure_domain; )? - _f - }, - ) + { + let _f = FailureDomain::default(); + $(let _f = $failure_domain; )? + $topology.build_instance( + $instance_id.map(<&str>::into).as_ref(), + $replicaset_id.map(<&str>::into).as_ref(), + &_f + ) + } }; } @@ -355,16 +377,14 @@ mod tests { $(, $target_grade:expr)?)? $(,)? ) => { - $topology.update_instance( - { - let req = rpc::update_instance::Request::new($instance_id.into(), "".into()); - $( - let req = $current_grade.modify(req); - $( let req = $target_grade.modify(req); )? - )? - req - } - ) + { + let req = rpc::update_instance::Request::new($instance_id.into(), "".into()); + $( + let req = $current_grade.modify(req); + $( let req = $target_grade.modify(req); )? + )? + $topology.build_updated_instance(&req) + } }; } @@ -374,8 +394,8 @@ mod tests { $instance_id:expr, $failure_domain:expr $(,)? ) => { - $topology.update_instance( - rpc::update_instance::Request::new($instance_id.into(), "".into()) + $topology.build_updated_instance( + &rpc::update_instance::Request::new($instance_id.into(), "".into()) .with_failure_domain($failure_domain), ) }; @@ -388,47 +408,60 @@ mod tests { } } - #[test] - fn test_simple() { - let mut topology = Topology::new(vec![]).with_replication_factor(1); + fn new_topology(storage: &Clusterwide, instances: Vec<Instance>, replication_factor: usize) -> Topology { + for instance in instances { + storage.instances.put(&instance).unwrap(); + } + storage.properties.put(crate::storage::PropertyName::ReplicationFactor, &replication_factor).unwrap(); + Topology::from(storage.clone()) + } - assert_eq!( - join!(topology, None, None, "addr:1").unwrap(), - (instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), - ); + #[::tarantool::test] + fn test_simple() { + let storage = Clusterwide::new().unwrap(); + let mut topology = new_topology(&storage, vec![], 1); + let instance = build_instance!(topology, None, None).unwrap(); assert_eq!( - join!(topology, None, None, "addr:1").unwrap(), - (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), + instance, + instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + let instance = build_instance!(topology, None, None).unwrap(); assert_eq!( - join!(topology, None, Some("R3"), "addr:1").unwrap(), - (instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), + instance, + instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + let instance = build_instance!(topology, None, Some("R3")).unwrap(); assert_eq!( - join!(topology, Some("I4"), None, "addr:1").unwrap(), - (instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), + instance, + instance!(3, "i3", "R3", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); - let mut topology = Topology::new( - instances![(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))] - ).with_replication_factor(1); - + let instance = build_instance!(topology, Some("I4"), None).unwrap(); assert_eq!( - join!(topology, None, None, "addr:1").unwrap(), - (instance!(2, "i2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), + instance, + instance!(4, "I4", "r3", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); } - #[test] + #[::tarantool::test] fn test_override() { - let mut topology = Topology::new(instances![ + let storage = Clusterwide::new().unwrap(); + let topology = new_topology(&storage, instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i2", "r2-original", CurrentGrade::offline(0), TargetGrade::offline(0)), - ]) - .with_replication_factor(2); + ], + 2); // join::Request with a given instance_id online. // - It must be an impostor, return an error. @@ -436,9 +469,8 @@ mod tests { // unreachable soon (when we implement failover) an the error // will be gone. assert_eq!( - join!(topology, Some("i1"), None, "active:2") - .unwrap_err() - .to_string(), + build_instance!(topology, Some("i1"), None) + .unwrap_err(), "i1 is already joined", ); @@ -453,12 +485,13 @@ mod tests { // - Even if it's an impostor, rely on auto-expel policy. // Disruption isn't destructive if auto-expel allows (TODO). assert_eq!( - join!(topology, Some("i2"), None, "inactive:2").unwrap(), - (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "inactive:2".into(), addresses!["who-cares.biz", "inactive:2"]), + build_instance!(topology, Some("i2"), None).unwrap(), + (instance!(3, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0))), // Attention: generated replicaset_id differs from the // original one, as well as raft_id. // That's a desired behavior. ); + assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([1])); // TODO // @@ -473,249 +506,289 @@ mod tests { // bootstrapping yet) will be disrupted. } - #[test] + #[::tarantool::test] fn test_instance_id_collision() { - let mut topology = Topology::new(instances![ + let storage = Clusterwide::new().unwrap(); + let topology = new_topology(&storage, instances![ (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), (2, "i3", "r3", CurrentGrade::online(1), TargetGrade::online(1)), // Attention: i3 has raft_id=2 - ]); + ], 1); assert_eq!( - join!(topology, None, Some("r2"), "addr:2").unwrap(), - (instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into(), addresses!["addr:2"]), + build_instance!(topology, None, Some("r2")).unwrap(), + instance!(3, "i3-2", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); } - #[test] + #[::tarantool::test] fn test_replication_factor() { - let mut topology = Topology::new(instances![ + let storage = Clusterwide::new().unwrap(); + let mut topology = new_topology(&storage, instances![ (9, "i9", "r9", CurrentGrade::online(1), TargetGrade::online(1)), (10, "i10", "r9", CurrentGrade::online(1), TargetGrade::online(1)), - ]) - .with_replication_factor(2); + ], + 2); + let instance = build_instance!(topology, Some("i1"), None).unwrap(); assert_eq!( - join!(topology, Some("i1"), None, "addr:1").unwrap(), - (instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:1".into(), addresses!["addr:1"]), + instance, + instance!(11, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([11])); + + let instance = build_instance!(topology, Some("i2"), None).unwrap(); assert_eq!( - join!(topology, Some("i2"), None, "addr:2").unwrap(), - (instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:2".into(), addresses!["addr:1", "addr:2"]), + instance, + instance!(12, "i2", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r1")), HashSet::from([11, 12])); + + let instance = build_instance!(topology, Some("i3"), None).unwrap(); assert_eq!( - join!(topology, Some("i3"), None, "addr:3").unwrap(), - (instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:3".into(), addresses!["addr:3"]), + instance, + instance!(13, "i3", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r2")), HashSet::from([13])); + + let instance = build_instance!(topology, Some("i4"), None).unwrap(); assert_eq!( - join!(topology, Some("i4"), None, "addr:4").unwrap(), - (instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), "addr:4".into(), addresses!["addr:3", "addr:4"]), + instance, + instance!(14, "i4", "r2", CurrentGrade::offline(0), TargetGrade::offline(0)), ); + storage.instances.put(&instance).unwrap(); + topology.update(instance, None); + assert_eq!(topology.get_replication_ids(&ReplicasetId::from("r2")), HashSet::from([13, 14])); } - #[test] + #[::tarantool::test] fn test_update_grade() { - let mut topology = Topology::new(instances![ - (1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)), - ]) - .with_replication_factor(1); + let storage = Clusterwide::new().unwrap(); + let instance_v0 = instance!(1, "i1", "r1", CurrentGrade::online(1), TargetGrade::online(1)); + let mut topology = new_topology(&storage, vec![instance_v0.clone()], 1); // Current grade incarnation is allowed to go down, // governor has the authority over it + let instance_v1 = set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(); + storage.instances.put(&instance_v1).unwrap(); + topology.update(instance_v1.clone(), Some(instance_v0)); assert_eq!( - set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(), + instance_v1, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // idempotency + let instance_v2 = set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(); + storage.instances.put(&instance_v2).unwrap(); + topology.update(instance_v2.clone(), Some(instance_v1)); assert_eq!( - set_grade!(topology, "i1", CurrentGrade::offline(0)).unwrap(), + instance_v2, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // TargetGradeVariant::Offline takes incarnation from current grade + let instance_v3 = set_grade!(topology, "i1", TargetGradeVariant::Offline).unwrap(); + storage.instances.put(&instance_v3).unwrap(); + topology.update(instance_v3.clone(), Some(instance_v2)); assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Offline).unwrap(), + instance_v3, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::offline(0)), ); // TargetGradeVariant::Online increases incarnation + let instance_v4 = set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(); + storage.instances.put(&instance_v4).unwrap(); + topology.update(instance_v4.clone(), Some(instance_v3)); assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(), + instance_v4, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(1)), ); // No idempotency, incarnation goes up + let instance_v5 = set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(); + storage.instances.put(&instance_v5).unwrap(); + topology.update(instance_v5.clone(), Some(instance_v4)); assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap(), + instance_v5, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::online(2)), ); // TargetGrade::Expelled takes incarnation from current grade + let instance_v6 = set_grade!(topology, "i1", TargetGradeVariant::Expelled).unwrap(); + storage.instances.put(&instance_v6).unwrap(); + topology.update(instance_v6.clone(), Some(instance_v5)); assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Expelled).unwrap(), + instance_v6, instance!(1, "i1", "r1", CurrentGrade::offline(0), TargetGrade::expelled(0)), ); // Instance get's expelled + let instance_v7 = set_grade!(topology, "i1", CurrentGrade::expelled(69)).unwrap(); + storage.instances.put(&instance_v7).unwrap(); + topology.update(instance_v7.clone(), Some(instance_v6)); assert_eq!( - set_grade!(topology, "i1", CurrentGrade::expelled(69)).unwrap(), + instance_v7, instance!(1, "i1", "r1", CurrentGrade::expelled(69), TargetGrade::expelled(0)), ); // Updating expelled instances isn't allowed assert_eq!( - set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap_err().to_string(), + set_grade!(topology, "i1", TargetGradeVariant::Online).unwrap_err(), "cannot update expelled instance \"i1\"", ); } - #[test] + #[::tarantool::test] fn failure_domain() { - let mut t = Topology::new(instances![]).with_replication_factor(3); - - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Earth}) - .unwrap() - .0 - .replicaset_id, - "r1", - ); - - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Earth}) - .unwrap() - .0 - .replicaset_id, - "r2", - ); - - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Mars}) - .unwrap() - .0 - .replicaset_id, - "r1", - ); + let storage = Clusterwide::new().unwrap(); + let mut t = new_topology(&storage, vec![], 3); + + let instance = + build_instance!(t, None, None, faildoms! {planet: Earth}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r1"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Earth, os: BSD}) - .unwrap() - .0 - .replicaset_id, - "r3", - ); + let instance = + build_instance!(t, None, None, faildoms! {planet: Earth}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r2"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Mars, os: BSD}) - .unwrap() - .0 - .replicaset_id, - "r2", - ); + let instance = + build_instance!(t, None, None, faildoms! {planet: Mars}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r1"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); + + let instance = + build_instance!(t, None, None, faildoms! {planet: Earth, os: BSD}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r3"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); + + let instance = + build_instance!(t, None, None, faildoms! {planet: Mars, os: BSD}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r2"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); assert_eq!( - join!(t, None, None, "-", faildoms! {os: Arch}) - .unwrap_err() - .to_string(), + build_instance!(t, None, None, faildoms! {os: Arch}) + .unwrap_err(), "missing failure domain names: PLANET", ); - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Venus, os: Arch}) - .unwrap() - .0 - .replicaset_id, - "r1", - ); + let instance = + build_instance!(t, None, None, faildoms! {planet: Venus, os: Arch}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r1"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Venus, os: Mac}) - .unwrap() - .0 - .replicaset_id, - "r2", - ); + let instance = + build_instance!(t, None, None, faildoms! {planet: Venus, os: Mac}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r2"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); - assert_eq!( - join!(t, None, None, "-", faildoms! {planet: Mars, os: Mac}) - .unwrap() - .0 - .replicaset_id, - "r3", - ); + let instance = + build_instance!(t, None, None, faildoms! {planet: Mars, os: Mac}) + .unwrap(); + assert_eq!(instance.replicaset_id, "r3"); + storage.instances.put(&instance).unwrap(); + t.update(instance, None); assert_eq!( - join!(t, None, None, "-", faildoms! {}) - .unwrap_err() - .to_string(), + build_instance!(t, None, None, faildoms! {}) + .unwrap_err(), "missing failure domain names: OS, PLANET", ); } - #[test] + #[::tarantool::test] fn reconfigure_failure_domain() { - let mut t = Topology::new(instances![]).with_replication_factor(3); + let storage = Clusterwide::new().unwrap(); + let mut t = new_topology(&storage, instances![], 3); // first instance - let (instance, ..) = join!(t, Some("i1"), None, "-", faildoms! {planet: Earth}).unwrap(); - assert_eq!(instance.failure_domain, faildoms! {planet: Earth}); - assert_eq!(instance.replicaset_id, "r1"); + let instance1_v1 = build_instance!(t, Some("i1"), None, faildoms! {planet: Earth}).unwrap(); + storage.instances.put(&instance1_v1).unwrap(); + t.update(instance1_v1.clone(), None); + assert_eq!(instance1_v1.failure_domain, faildoms! {planet: Earth}); + assert_eq!(instance1_v1.replicaset_id, "r1"); // reconfigure single instance, fail assert_eq!( set_faildoms!(t, "i1", faildoms! {owner: Ivan}) - .unwrap_err() - .to_string(), + .unwrap_err(), "missing failure domain names: PLANET", ); // reconfigure single instance, success - let instance = set_faildoms!(t, "i1", faildoms! {planet: Mars, owner: Ivan}).unwrap(); - assert_eq!(instance.failure_domain, faildoms! {planet: Mars, owner: Ivan}); - assert_eq!(instance.replicaset_id, "r1"); // same replicaset + let instance1_v2 = set_faildoms!(t, "i1", faildoms! {planet: Mars, owner: Ivan}).unwrap(); + storage.instances.put(&instance1_v2).unwrap(); + t.update(instance1_v2.clone(), Some(instance1_v1)); + assert_eq!(instance1_v2.failure_domain, faildoms! {planet: Mars, owner: Ivan}); + assert_eq!(instance1_v2.replicaset_id, "r1"); // same replicaset // second instance won't be joined without the newly added required // failure domain subdivision of "OWNER" assert_eq!( - join!(t, Some("i2"), None, "-", faildoms! {planet: Mars}) - .unwrap_err() - .to_string(), + build_instance!(t, Some("i2"), None, faildoms! {planet: Mars}) + .unwrap_err(), "missing failure domain names: OWNER", ); // second instance #[rustfmt::skip] - let (instance, ..) = join!(t, Some("i2"), None, "-", faildoms! {planet: Mars, owner: Mike}) + let instance2_v1 = build_instance!(t, Some("i2"), None, faildoms! {planet: Mars, owner: Mike}) .unwrap(); - assert_eq!(instance.failure_domain, faildoms! {planet: Mars, owner: Mike}); + storage.instances.put(&instance2_v1).unwrap(); + t.update(instance2_v1.clone(), None); + assert_eq!(instance2_v1.failure_domain, faildoms! {planet: Mars, owner: Mike}); // doesn't fit into r1 - assert_eq!(instance.replicaset_id, "r2"); + assert_eq!(instance2_v1.replicaset_id, "r2"); // reconfigure second instance, success - let instance = set_faildoms!(t, "i2", faildoms! {planet: Earth, owner: Mike}).unwrap(); - assert_eq!(instance.failure_domain, faildoms! {planet: Earth, owner: Mike}); + let instance2_v2 = set_faildoms!(t, "i2", faildoms! {planet: Earth, owner: Mike}).unwrap(); + storage.instances.put(&instance2_v2).unwrap(); + t.update(instance2_v2.clone(), Some(instance2_v1)); + assert_eq!(instance2_v2.failure_domain, faildoms! {planet: Earth, owner: Mike}); // replicaset doesn't change automatically - assert_eq!(instance.replicaset_id, "r2"); + assert_eq!(instance2_v2.replicaset_id, "r2"); // add instance with new subdivision #[rustfmt::skip] - let (instance, ..) = join!(t, Some("i3"), None, "-", faildoms! {planet: B, owner: V, dimension: C137}) + let instance3_v1 = build_instance!(t, Some("i3"), None, faildoms! {planet: B, owner: V, dimension: C137}) .unwrap(); + storage.instances.put(&instance3_v1).unwrap(); + t.update(instance3_v1.clone(), None); assert_eq!( - instance.failure_domain, + instance3_v1.failure_domain, faildoms! {planet: B, owner: V, dimension: C137} ); - assert_eq!(instance.replicaset_id, "r1"); + assert_eq!(instance3_v1.replicaset_id, "r1"); // even though the only instance with failure domain subdivision of // `DIMENSION` is inactive, we can't add an instance without that // subdivision #[rustfmt::skip] assert_eq!( - join!(t, Some("i4"), None, "-", faildoms! {planet: Theia, owner: Me}) - .unwrap_err() - .to_string(), + build_instance!(t, Some("i4"), None, faildoms! {planet: Theia, owner: Me}) + .unwrap_err(), "missing failure domain names: DIMENSION", ); } diff --git a/src/util.rs b/src/util.rs index 6c6f813664436e0cacd1bbf3822b9352242a8f40..471cd14fd53eedaa8b6aee4ae6501e9119d7ec96 100644 --- a/src/util.rs +++ b/src/util.rs @@ -14,7 +14,7 @@ use std::time::{Duration, Instant}; const INFINITY: Duration = Duration::from_secs(30 * 365 * 24 * 60 * 60); -// TODO: move to tarantool_module when we have custom `Instance` there +// TODO: move to tarantool_module when we have custom `Instant` there. pub fn instant_saturating_add(t: Instant, d: Duration) -> Instant { t.checked_add(d) .unwrap_or_else(|| t.checked_add(INFINITY).expect("that's too much, man")) @@ -22,8 +22,9 @@ pub fn instant_saturating_add(t: Instant, d: Duration) -> Instant { // TODO: move to tarantool_module pub async fn sleep_async(time: Duration) { - let (_, rx) = fiber::r#async::oneshot::channel::<()>(); + let (tx, rx) = fiber::r#async::oneshot::channel::<()>(); rx.timeout(time).await.unwrap_err(); + drop(tx); } //////////////////////////////////////////////////////////////////////////////// @@ -565,8 +566,12 @@ mod tests { mod tarantool_tests { use std::time::Duration; + use ::tarantool::fiber; + #[::tarantool::test] - async fn sleep_wakes_up() { - super::sleep_async(Duration::from_millis(10)).await; + fn sleep_wakes_up() { + let should_yield = + fiber::check_yield(|| fiber::block_on(super::sleep_async(Duration::from_millis(10)))); + assert_eq!(should_yield, fiber::YieldResult::Yielded(())); } } diff --git a/test/int/test_cas.py b/test/int/test_cas.py index f42ddfcec9ffac7691dcea338a2e64581266d71f..e4bfc8cb006d84e36d6a0c65f008d72468bf96dc 100644 --- a/test/int/test_cas.py +++ b/test/int/test_cas.py @@ -194,8 +194,7 @@ def test_cas_lua_api(cluster: Cluster): ranges=[CasRange(eq="fruit")], ) assert e5.value.args == ( - "network error: service responded with error: " - + "compare-and-swap request failed: " + "compare-and-swap request failed: " + f"comparison failed for index {read_index} " + f"as it conflicts with {read_index+1}", ) diff --git a/test/int/test_joining.py b/test/int/test_joining.py index 5a5744b7f7e46b7a90ae610dd15a6790fb526b4a..139f08f9cb6376431e9b12285105bed298393a50 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -54,11 +54,11 @@ def test_request_follower(cluster2: Cluster): i1, i2 = cluster2.instances i2.assert_raft_status("Follower") - expected = [{"ErrNotALeader": {"raft_id": 1, "address": i1.listen}}] actual = raft_join( instance=i2, cluster_id=cluster2.id, instance_id="fake-0", timeout_seconds=1 ) - assert actual == expected + # Even though a follower is called new instance is joined successfully + assert actual[0]["instance"]["raft_id"] == 3 def test_discovery(cluster3: Cluster): @@ -353,63 +353,3 @@ def test_fail_to_join(cluster: Cluster): """ ) assert {tuple(i) for i in joined_instances} == {(i1.instance_id, i1.raft_id)} - - -def test_not_a_leader_at_postjoin(cluster: Cluster): - # Scenario: join instance even if leader changed at postjoin step - # Given a cluster - # When new instance join the leader - # And leader has been changed moment before the postjoin step - # Then the joining instance does not fall - # And the joining instance joined to new leader - - cluster.deploy(instance_count=2) - i1, i2 = cluster.instances - i1.assert_raft_status("Leader") - i1.eval( - """ - local args = ... - box.schema.func.drop(".proc_update_instance") - _G[""] = { proc_update_instance = function() - box.schema.func.create(".proc_update_instance", {language="C", if_not_exists=true}) - require("net.box").connect(args.addr):call("pico.raft_timeout_now") - return {'ErrNotALeader'} - end } - """, - dict(addr=i2.listen), - ) - i3 = cluster.add_instance() - - i1.assert_raft_status("Follower") - i2.assert_raft_status("Leader") - i3.assert_raft_status("Follower") - - -def test_not_a_leader_at_start_join(cluster: Cluster): - # Scenario: join instance even if leader changed at start_join step - # Given a cluster - # When new instance join the leader - # And leader has been changed moment before the start_join step - # Then the joining instance does not fall - # And the joining instance joined to new leader - - cluster.deploy(instance_count=2) - i1, i2 = cluster.instances - i1.assert_raft_status("Leader") - i1.eval( - """ - args = ... - box.schema.func.drop(".proc_raft_join") - _G[""] = { proc_raft_join = function() - box.schema.func.create(".proc_raft_join", {language="C", if_not_exists=true}) - require("net.box").connect(args.addr):call("pico.raft_timeout_now") - return {{["ErrNotALeader"] = {raft_id=args.id, address=args.addr}}} - end } - """, - dict(id=i2.raft_id, addr=i2.listen), - ) - i3 = cluster.add_instance() - - i1.assert_raft_status("Follower") - i2.assert_raft_status("Leader") - i3.assert_raft_status("Follower")