diff --git a/CHANGELOG.md b/CHANGELOG.md index ad60095bf86cb5beb9df2635b65d414bceab133f..bf0fb58f48e5b1a6146a8388aad8559f33090582 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -8,6 +8,12 @@ with the `YY.0M.MICRO` scheme. <img src="https://img.shields.io/badge/calver-YY.0M.MICRO-22bfda.svg"> +## HEAD + +### Features + +- Add Expel Node feature + ## [22.07.0] - 2022-07-08 ### Basic functionality diff --git a/src/args.rs b/src/args.rs index 1959fe43acad32a6ca9ed3a80bb912208a8557f6..067594791e829b0d4810445786dbc0d97f2ca47d 100644 --- a/src/args.rs +++ b/src/args.rs @@ -15,6 +15,7 @@ use crate::util::Uppercase; pub enum Picodata { Run(Run), Tarantool(Tarantool), + Expel(Expel), Test(Test), } @@ -189,6 +190,38 @@ impl Tarantool { } } +//////////////////////////////////////////////////////////////////////////////// +/// Expel +//////////////////////////////////////////////////////////////////////////////// + +#[derive(Debug, Parser, tlua::Push)] +#[clap(about = "Expel node from cluster")] +pub struct Expel { + #[clap(long, value_name = "name", default_value = "demo")] + /// Name of the cluster from instance should be expelled. + pub cluster_id: String, + + #[clap(long, value_name = "name", default_value = "")] + /// Name of the instance to expel. + pub instance_id: String, + + #[clap( + long = "peer", + value_name = "[host][:port]", + parse(try_from_str = try_parse_address), + default_value = "localhost:3301", + )] + /// Address of any instance from the cluster. + pub peer: String, +} + +impl Expel { + // Get the arguments that will be passed to `tarantool_main` + pub fn tt_args(&self) -> Result<Vec<CString>, String> { + Ok(vec![current_exe()?]) + } +} + //////////////////////////////////////////////////////////////////////////////// // Test //////////////////////////////////////////////////////////////////////////////// diff --git a/src/main.rs b/src/main.rs index 4b7e95a83d82f7000e4ab7c47615483747d6f03e..69d0691102977857e4a34e4916da646f826fabc2 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,6 +11,7 @@ use ::tarantool::tlua; use ::tarantool::transaction::start_transaction; use std::convert::TryFrom; use std::time::{Duration, Instant}; +use traft::ExpelRequest; use clap::StructOpt as _; use protobuf::Message as _; @@ -89,6 +90,12 @@ fn picolib_setup(args: &args::Run) { tarantool::exit(code.unwrap_or(0)) }), ); + luamod.set( + "expel", + tlua::function1(|instance_id: String| -> Result<(), Error> { + traft::node::expel_wrapper(instance_id) + }), + ); #[derive(::tarantool::tlua::LuaRead)] struct ProposeEvalOpts { timeout: Option<f64>, @@ -272,6 +279,8 @@ fn init_handlers() { declare_cfunc!(discovery::proc_discover); declare_cfunc!(traft::node::raft_interact); declare_cfunc!(traft::node::raft_join); + declare_cfunc!(traft::node::raft_expel_on_leader); + declare_cfunc!(traft::node::raft_expel); declare_cfunc!(traft::failover::raft_update_peer); } @@ -297,6 +306,7 @@ fn main() -> ! { args::Picodata::Run(args) => main_run(args), args::Picodata::Test(args) => main_test(args), args::Picodata::Tarantool(args) => main_tarantool(args), + args::Picodata::Expel(args) => main_expel(args), } } @@ -854,6 +864,36 @@ fn main_tarantool(args: args::Tarantool) -> ! { std::process::exit(rc); } +fn main_expel(args: args::Expel) -> ! { + let rc = tarantool_main!( + args.tt_args().unwrap(), + callback_data: (args,), + callback_data_type: (args::Expel,), + callback_body: { + tt_expel(args) + } + ); + std::process::exit(rc); +} + +fn tt_expel(args: args::Expel) { + let fn_name = stringify_cfunc!(traft::node::raft_expel); + let req = ExpelRequest { + cluster_id: args.cluster_id, + instance_id: args.instance_id, + }; + match tarantool::net_box_call(&args.peer, fn_name, &req, Duration::MAX) { + Ok::<traft::ExpelResponse, _>(_resp) => { + tlog!(Info, "Success expel call"); + std::process::exit(0); + } + Err(e) => { + tlog!(Error, "Failed to expel instance: {e}"); + std::process::exit(-1); + } + } +} + macro_rules! color { (@priv red) => { "\x1b[0;31m" }; (@priv green) => { "\x1b[0;32m" }; diff --git a/src/traft/error.rs b/src/traft/error.rs index 1d6a686a63e536a96afdb8515446d05c6a72fd51..a25767444eabef13732ad3bca617aee7395bd86f 100644 --- a/src/traft/error.rs +++ b/src/traft/error.rs @@ -24,6 +24,8 @@ pub enum Error { }, #[error("error during execution of lua code: {0}")] Lua(#[from] LuaError), + #[error("other error")] + Other(Box<dyn std::error::Error>), } #[derive(Debug, Error)] diff --git a/src/traft/mod.rs b/src/traft/mod.rs index d832cf23ccb7d7679e297cc3a79c1f225e051bdb..d4844d658dacf1cda0536606edf462bc4cd8e014 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -561,6 +561,17 @@ pub struct JoinResponse { } impl Encode for JoinResponse {} +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExpelRequest { + pub cluster_id: String, + pub instance_id: String, +} +impl Encode for ExpelRequest {} + +#[derive(Clone, Debug, Serialize, Deserialize)] +pub struct ExpelResponse {} +impl Encode for ExpelResponse {} + /////////////////////////////////////////////////////////////////////////////// /// Activity state of an instance. #[derive(PartialEq, Eq, Clone, Debug, Deserialize, Serialize)] @@ -569,6 +580,8 @@ pub enum Health { Online, // Instance has gracefully shut down. Offline, + // Instance has permanently removed from cluster. + Expelled, } impl Health { @@ -576,6 +589,7 @@ impl Health { match self { Self::Online => "Online", Self::Offline => "Offline", + Self::Expelled => "Expelled", } } } @@ -624,6 +638,16 @@ impl UpdatePeerRequest { } } + #[inline] + pub fn set_expelled(instance_id: impl Into<String>, cluster_id: impl Into<String>) -> Self { + Self { + health: Health::Expelled, + instance_id: instance_id.into(), + cluster_id: cluster_id.into(), + failure_domain: None, + } + } + #[inline] pub fn set_failure_domain(&mut self, failure_domain: FailureDomain) { self.failure_domain = Some(failure_domain); diff --git a/src/traft/node.rs b/src/traft/node.rs index b159c8db1381d6ebbcf0ead0bd661ef6c05b1ced..1851c1e5ed985f75ecc4681715afb630e8503fbf 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -47,8 +47,9 @@ use crate::traft::Op; use crate::traft::Storage; use crate::traft::Topology; use crate::traft::TopologyRequest; -use crate::traft::{JoinRequest, JoinResponse, UpdatePeerRequest}; +use crate::traft::{ExpelRequest, ExpelResponse, JoinRequest, JoinResponse, UpdatePeerRequest}; +use super::Health; use super::OpResult; type RawNode = raft::RawNode<Storage>; @@ -298,7 +299,6 @@ impl Node { advertise_address, failure_domain, ), - TopologyRequest::UpdatePeer(UpdatePeerRequest { instance_id, health, @@ -462,9 +462,13 @@ fn handle_committed_entries( }; match entry.entry_type { - raft::EntryType::EntryNormal => { - handle_committed_normal_entry(entry, notifications, pool, topology_changed) - } + raft::EntryType::EntryNormal => handle_committed_normal_entry( + entry, + notifications, + pool, + topology_changed, + raw_node, + ), raft::EntryType::EntryConfChange | raft::EntryType::EntryConfChangeV2 => { handle_committed_conf_change(entry, raw_node) } @@ -487,6 +491,7 @@ fn handle_committed_normal_entry( notifications: &mut HashMap<LogicalClock, Notify>, pool: &mut ConnectionPool, topology_changed: &mut bool, + raw_node: &mut RawNode, ) { assert_eq!(entry.entry_type, raft::EntryType::EntryNormal); let result = entry.op().unwrap_or(&traft::Op::Nop).on_commit(); @@ -500,6 +505,9 @@ fn handle_committed_normal_entry( if let Some(traft::Op::PersistPeer { peer }) = entry.op() { pool.connect(peer.raft_id, peer.peer_address.clone()); *topology_changed = true; + if peer.health == Health::Expelled && peer.raft_id == raw_node.raft.id { + crate::tarantool::exit(0); + } } with_joint_state_latch(|joint_state_latch| { @@ -927,3 +935,70 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn std::error::Error box_replication, }) } + +// Lua API entrypoint, run on any node. +pub fn expel_wrapper(instance_id: String) -> Result<(), traft::error::Error> { + match expel_by_instance_id(instance_id) { + Ok(ExpelResponse {}) => Ok(()), + Err(e) => Err(traft::error::Error::Other(e)), + } +} + +fn expel_by_instance_id(instance_id: String) -> Result<ExpelResponse, Box<dyn std::error::Error>> { + let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + + expel(ExpelRequest { + instance_id, + cluster_id, + }) +} + +// NetBox entrypoint. Run on any node. +#[proc(packed_args)] +fn raft_expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { + expel(req) +} + +// Netbox entrypoint. For run on Leader only. Don't call directly, use `raft_expel` instead. +#[proc(packed_args)] +fn raft_expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { + expel_on_leader(req) +} + +fn expel(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { + let node = global()?; + let leader_id = node.status().leader_id.ok_or("leader_id not found")?; + let leader = Storage::peer_by_raft_id(leader_id).unwrap().unwrap(); + let leader_address = leader.peer_address; + + let fn_name = stringify_cfunc!(traft::node::raft_expel_on_leader); + + match crate::tarantool::net_box_call(&leader_address, fn_name, &req, Duration::MAX) { + Ok::<traft::ExpelResponse, _>(_resp) => Ok(ExpelResponse {}), + Err(e) => Err(Box::new(e)), + } +} + +fn expel_on_leader(req: ExpelRequest) -> Result<ExpelResponse, Box<dyn std::error::Error>> { + let cluster_id = Storage::cluster_id()?.ok_or("cluster_id is not set yet")?; + + if req.cluster_id != cluster_id { + return Err(Box::new(Error::ClusterIdMismatch { + instance_cluster_id: req.cluster_id, + cluster_cluster_id: cluster_id, + })); + } + + let node = global()?; + + let leader_id = node.status().leader_id.ok_or("leader_id not found")?; + + if node.raft_id != leader_id { + return Err(Box::from("not a leader")); + } + + let req = UpdatePeerRequest::set_expelled(req.instance_id, req.cluster_id); + node.handle_topology_request_and_wait(req.into())?; + + Ok(ExpelResponse {}) +} diff --git a/src/traft/storage.rs b/src/traft/storage.rs index 89c669dc33b695455ee3e4691e3a32b25b4235f8..c05ebade3452c6c9730f873747cddf67bef33713 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -59,6 +59,7 @@ impl Storage { parts = {{'index'}}, }) + box.schema.space.create('raft_state', { if_not_exists = true, is_local = true, @@ -67,12 +68,12 @@ impl Storage { {name = 'value', type = 'any', is_nullable = false}, } }) - box.space.raft_state:create_index('pk', { if_not_exists = true, parts = {{'key'}}, }) + box.schema.space.create('raft_group', { if_not_exists = true, is_local = true, @@ -88,7 +89,6 @@ impl Storage { {name = 'failure_domain', type = 'map', is_nullable = false}, } }) - box.space.raft_group:create_index('instance_id', { if_not_exists = true, parts = {{'instance_id'}}, @@ -315,9 +315,9 @@ impl Storage { } #[allow(dead_code)] - pub fn delete_peer(raft_id: RaftId) -> Result<(), StorageError> { + pub fn delete_peer(instance_id: &str) -> Result<(), StorageError> { Storage::space(RAFT_GROUP)? - .delete(&[raft_id]) + .delete(&[instance_id]) .map_err(box_err!())?; Ok(()) diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 4085b55e09149eafb8dfa08ec81380c4922fed7e..1a587d4dd3f816c4daf34322d76c6563a80a28ec 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -41,6 +41,15 @@ impl Topology { self } + pub fn get_peer(&mut self, instance_id: &str) -> Result<Peer, String> { + let peer = self + .instance_map + .get_mut(instance_id) + .ok_or_else(|| format!("unknown instance {}", instance_id))?; + + Ok(peer.clone()) + } + fn put_peer(&mut self, peer: Peer) { self.max_raft_id = std::cmp::max(self.max_raft_id, peer.raft_id); @@ -183,6 +192,12 @@ impl Topology { health: Health, failure_domain: Option<FailureDomain>, ) -> Result<Peer, String> { + let current_peer = self.get_peer(instance_id).unwrap(); + let health = match current_peer.health { + Health::Expelled => Health::Expelled, + _ => health, + }; + let this = self as *const Self; let mut peer = self diff --git a/test/conftest.py b/test/conftest.py index b40a7d6f129ac49c7c9b05ff540b61d5c7bdf27a..b4cfc6122d867a14a4ecbeb8cafd4a48a6b256bf 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -548,6 +548,26 @@ class Cluster: def remove_data(self): rmtree(self.data_dir) + def expel(self, target: Instance, peer: Instance = None): + peer = peer if peer else target + + # fmt: off + command = [ + self.binary_path, "expel", + "--peer", peer.listen, + "--cluster-id", target.cluster_id, + "--instance-id", target.instance_id, + ] + # fmt: on + + subprocess.Popen( + command, + stdin=subprocess.DEVNULL, + start_new_session=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + @pytest.fixture(scope="session") def compile() -> None: @@ -611,3 +631,13 @@ def retrying(fn, timeout=3): except AssertionError as ex: if (datetime.now() - start).seconds > timeout: raise ex from ex + + +def pid_alive(pid): + """Check For the existence of a unix pid.""" + try: + os.kill(pid, 0) + except OSError: + return False + else: + return True diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py new file mode 100644 index 0000000000000000000000000000000000000000..df4dd8d14e1964bd194bdc04c71cfaa668197f95 --- /dev/null +++ b/test/int/test_expelling.py @@ -0,0 +1,90 @@ +import pytest +from conftest import Cluster, Instance, retrying, pid_alive + + +@pytest.fixture +def cluster3(cluster: Cluster): + cluster.deploy(instance_count=3) + return cluster + + +def assert_peer_expelled(expelled_peer: Instance, instance: Instance): + health = instance.eval( + "return box.space.raft_group.index.instance_id:get(...).health", + expelled_peer.instance_id, + ) + # print(health) + assert health == "Expelled" + + +def assert_voters(voters: list[Instance], instance: Instance): + expected_voters = list(map(lambda i: i.raft_id, voters)) + real_voters = instance.eval("return box.space.raft_state:get('voters').value") + assert real_voters.sort() == expected_voters.sort() + + +def assert_pid_down(pid): + assert not pid_alive(pid) + + +def test_expel_follower(cluster3: Cluster): + # Scenario: expel a Follower instance by command to Leader + # Given a cluster + # When a Follower instance expelled from the cluster + # Then the instance marked as expelled in the peers table + # And excluded from the voters list + + i1, i2, i3 = cluster3.instances + i1.promote_or_fail() + + i3.assert_raft_status("Follower", leader_id=i1.raft_id) + + cluster3.expel(i3, i1) + + retrying(lambda: assert_peer_expelled(i3, i1)) + retrying(lambda: assert_voters([i1, i2], i1)) + + # assert i3.process + # retrying(lambda: assert_pid_down(i3.process.pid)) + + +def test_expel_leader(cluster3: Cluster): + # Scenario: expel a Leader instance by command to itself + # Given a cluster + # When a Leader instance expelled from the cluster + # Then the instance marked as expelled in the peers table + # And excluded from the voters list + + i1, i2, i3 = cluster3.instances + i1.promote_or_fail() + + i1.assert_raft_status("Leader") + + cluster3.expel(i1) + + retrying(lambda: assert_peer_expelled(i1, i2)) + retrying(lambda: assert_voters([i2, i3], i2)) + + # assert i1.process + # retrying(lambda: assert_pid_down(i1.process.pid)) + + +def test_expel_by_follower(cluster3: Cluster): + # Scenario: expel an instance by command to a Follower + # Given a cluster + # When instance which is not a Leader receives expel CLI command + # Then expelling instance is expelled + + i1, i2, i3 = cluster3.instances + i1.promote_or_fail() + + i2.assert_raft_status("Follower", leader_id=i1.raft_id) + i3.assert_raft_status("Follower", leader_id=i1.raft_id) + + cluster3.expel(i3, i2) + + retrying(lambda: assert_peer_expelled(i3, i1)) + retrying(lambda: assert_voters([i1, i2], i1)) + + # assert i3.process + # retrying(lambda: assert_pid_down(i3.process.pid)) diff --git a/test/int/test_supervisor.py b/test/int/test_supervisor.py index 684bf1fa31c3992a2321c7196fbbfc8a5c974f82..2a36701743d660ab9ffec842ca7e5a22af4fb763 100644 --- a/test/int/test_supervisor.py +++ b/test/int/test_supervisor.py @@ -8,6 +8,7 @@ from conftest import ( Cluster, Instance, retrying, + pid_alive, ) from functools import reduce @@ -32,16 +33,6 @@ def pgrep_tree(pid): return [pid] -def pid_alive(pid): - """Check For the existence of a unix pid.""" - try: - os.kill(pid, 0) - except OSError: - return False - else: - return True - - def assert_all_pids_down(pids): assert all(map(lambda pid: not pid_alive(pid), pids)) diff --git a/test/rand/test_randomized.py b/test/rand/test_randomized.py index 75de31078e57e8394102eb5d021b01c2ebf7cede..7721383e5bd38a7dd5d1b754d97d9b1a7d4cdefd 100644 --- a/test/rand/test_randomized.py +++ b/test/rand/test_randomized.py @@ -12,19 +12,25 @@ def create(c: Cluster, istate): return i, istate -def stop(i: Instance, istate): +def stop(_: Cluster, i: Instance, istate): istate[i.instance_id]["started"] = False return i.terminate(), istate -def start(i: Instance, istate): +def start(_: Cluster, i: Instance, istate): istate[i.instance_id]["started"] = True return i.start(), istate +def expel(c: Cluster, i: Instance, istate): + istate[i.instance_id]["started"] = False + return c.expel(i) + + ADD = "add" STOP = "stop" START = "start" +EXPEL = "expel" ACTIONS = { ADD: { "name": ADD, @@ -42,6 +48,11 @@ ACTIONS = { "repr_fn": lambda i: f"Start {i}", "exec_fn": start, }, + EXPEL: { + "name": EXPEL, + "repr_fn": lambda i: f"Expel {i}", + "exec_fn": expel, + }, } BASE = len(ACTIONS) @@ -103,7 +114,7 @@ def test_randomized(cluster: Cluster, seed: str, delay: int, capsys): if "pre_fn" in a.keys(): i, istate = a["pre_fn"](cluster, istate) print(step_msg(step + 1, a, i)) - _, istate = a["exec_fn"](i, istate) + _, istate = a["exec_fn"](cluster, i, istate) time.sleep(delay / 1000) for instance_id in istate: