diff --git a/CHANGELOG.md b/CHANGELOG.md index 6b9c4bf07e2382551030666f75cde6f9b7190f92..a0b89408422c5c89a657cd96ffc57e41359c4336 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -64,6 +64,10 @@ to 2 and 3. - It's no longer possible to execute DML queries for tables that are not operable - Fixed panic on user/role creation when max user number was exceeded +- `picodata expel` used to finish before the instance got finally expelled. + Now it will block until the instance is completely expelled, or the timeout + is exceeded. + ## [24.6.1] - 2024-10-28 ### Configuration diff --git a/src/cli/args.rs b/src/cli/args.rs index cd6a4616cc50fd5bc05f51d1db2419bc138e8d2e..e8a49c3141e8bc25a7f21af3aa4d7825a425afd5 100644 --- a/src/cli/args.rs +++ b/src/cli/args.rs @@ -393,10 +393,9 @@ pub struct Expel { short = 't', long = "timeout", value_name = "TIMEOUT", - default_value = "5", - env = "PICODATA_CONNECT_TIMEOUT" + default_value = "60" )] - /// Connection timeout in seconds. + /// Time to wait for the operation to complete. pub timeout: u64, } diff --git a/src/cli/connect.rs b/src/cli/connect.rs index fe8b9a24c6b9fb48ef321ada438beabb28cdea7d..2e07a143ffea6683f15c56e0f733cd6b87f61a84 100644 --- a/src/cli/connect.rs +++ b/src/cli/connect.rs @@ -161,7 +161,7 @@ pub fn determine_credentials_and_connect( user: Option<&str>, password_file: Option<&str>, auth_method: AuthMethod, - timeout: u64, + timeout: Duration, ) -> Result<(Client, String), Error> { let user = if let Some(user) = &address.user { user @@ -184,7 +184,7 @@ pub fn determine_credentials_and_connect( let mut config = Config::default(); config.creds = Some((user.into(), password)); config.auth_method = auth_method; - config.connect_timeout = Some(Duration::from_secs(timeout)); + config.connect_timeout = Some(timeout); let port = match address.port.parse::<u16>() { Ok(port) => port, @@ -208,7 +208,7 @@ fn sql_repl(args: args::Connect) -> Result<(), ReplError> { Some(&args.user), args.password_file.as_deref(), args.auth_method, - args.timeout, + Duration::from_secs(args.timeout), ) .map_err(|err| ReplError::Other(format!("Connection Error. Try to reconnect: {}", err)))?; diff --git a/src/cli/expel.rs b/src/cli/expel.rs index 4e32d4659d4f0c68e23daad2db24dc84a59652da..b897639ba060048d5dc3680d73e066059f927e30 100644 --- a/src/cli/expel.rs +++ b/src/cli/expel.rs @@ -4,21 +4,26 @@ use crate::rpc::expel::redirect::proc_expel_redirect; use crate::rpc::expel::Request as ExpelRequest; use crate::tlog; use crate::traft::error::Error; +use std::time::Duration; use tarantool::fiber; use tarantool::network::client::AsClient; pub async fn tt_expel(args: args::Expel) -> Result<(), Error> { + let timeout = Duration::from_secs(args.timeout); + let deadline = fiber::clock().saturating_add(timeout); let (client, _) = determine_credentials_and_connect( &args.peer_address, Some("admin"), args.password_file.as_deref(), args.auth_method, - args.timeout, + timeout, )?; + let timeout = deadline.duration_since(fiber::clock()); let req = ExpelRequest { cluster_name: args.cluster_name, instance_uuid: args.instance_uuid.clone(), + timeout, }; fiber::block_on(client.call(crate::proc_name!(proc_expel_redirect), &req)) .map_err(|e| Error::other(format!("Failed to expel instance: {e}")))?; diff --git a/src/cli/plugin.rs b/src/cli/plugin.rs index 897ec41af16e49321e8b25a9fa07e5210bcd098a..259e44a7c414c0f7379195f7829da0eb9e5f1bfd 100644 --- a/src/cli/plugin.rs +++ b/src/cli/plugin.rs @@ -1,3 +1,4 @@ +use std::time::Duration; use std::{collections::HashMap, fs::read_to_string}; use crate::{ @@ -88,6 +89,8 @@ fn main_impl(args: Plugin) -> Result<(), ReplError> { service_names, password_file, } = cfg; + // FIXME: should log a warning that the username specified by user + // is ignored address.user = None; // ignore username, we will connect as `pico_service` let password_file = password_file.as_ref().and_then(|path| path.to_str()); @@ -95,12 +98,16 @@ fn main_impl(args: Plugin) -> Result<(), ReplError> { let config_values: ConfigRepr = serde_yaml::from_str(&config_raw) .map_err(|err| ReplError::Other(err.to_string()))?; + // FIXME: there should be a --timeout commandline parameter for this + // <https://git.picodata.io/core/picodata/-/issues/1234> + let timeout = Duration::from_secs(10); + let (client, _) = determine_credentials_and_connect( &address, Some(PICO_SERVICE_USER_NAME), password_file, AuthMethod::ChapSha1, - 10, /* timeout */ + timeout, ) .map_err(|err| ReplError::Other(err.to_string()))?; diff --git a/src/luamod.rs b/src/luamod.rs index 85c7db454442c694a17dfb789010db0aed4816a6..1fda9aa0e5f934f3868fa9d7930f4c67dc38a5d9 100644 --- a/src/luamod.rs +++ b/src/luamod.rs @@ -704,6 +704,8 @@ pub(crate) fn setup() { Params: 1. instance_name (string) + 2. opts (table) + - timeout (number) Returns: @@ -711,21 +713,35 @@ pub(crate) fn setup() { or (nil, string) in case of an error "}, - tlua::function1(|instance_name: InstanceName| -> traft::Result<bool> { - let node = traft::node::global()?; - let raft_storage = &node.raft_storage; - let instance = node.storage.instances.get(&instance_name)?; - let cluster_name = raft_storage.cluster_name()?; - fiber::block_on(rpc::network_call_to_leader( - crate::proc_name!(rpc::expel::proc_expel), - &rpc::expel::Request { - instance_uuid: instance.uuid, - cluster_name, - }, - ))?; - Ok(true) - }), + tlua::Function::new( + |instance_name: InstanceName, opts: Option<PicoExpelOptions>| -> traft::Result<bool> { + let mut timeout = Duration::from_secs(3600); + if let Some(opts) = opts { + if let Some(t) = opts.timeout { + timeout = Duration::from_secs_f64(t); + } + } + + let node = traft::node::global()?; + let raft_storage = &node.raft_storage; + let instance = node.storage.instances.get(&instance_name)?; + let cluster_name = raft_storage.cluster_name()?; + fiber::block_on(rpc::network_call_to_leader( + crate::proc_name!(rpc::expel::proc_expel), + &rpc::expel::Request { + instance_uuid: instance.uuid, + cluster_name, + timeout, + }, + ))?; + Ok(true) + }, + ), ); + #[derive(tlua::LuaRead)] + struct PicoExpelOptions { + timeout: Option<f64>, + } /////////////////////////////////////////////////////////////////////////// #[derive(::tarantool::tlua::LuaRead, Default, Clone, Copy)] diff --git a/src/rpc/expel.rs b/src/rpc/expel.rs index 6579043a25c81070dd6d764616f8b5525af1af9d..f7da9b5f04351aeee26fc81341d49772f15df026 100644 --- a/src/rpc/expel.rs +++ b/src/rpc/expel.rs @@ -1,13 +1,13 @@ use std::time::Duration; -use crate::instance::{Instance, StateVariant::*}; +use crate::has_states; +use crate::instance::StateVariant::*; use crate::rpc; use crate::rpc::update_instance::handle_update_instance_request_and_wait; -use crate::traft::error::IdOfInstance; use crate::traft::Result; use crate::traft::{error::Error, node}; - -const TIMEOUT: Duration = Duration::from_secs(10); +use tarantool::error::BoxError; +use tarantool::error::TarantoolErrorCode; crate::define_rpc_request! { /// Submits a request to expel the specified instance. If successful @@ -34,14 +34,21 @@ crate::define_rpc_request! { }); } - let instance = node.storage.instances.by_uuid(&req.instance_uuid)?; - let Some(Instance { name, .. }) = instance else { - return Err(Error::NoSuchInstance(IdOfInstance::Uuid(req.instance_uuid))); - }; + let topology_ref = node.topology_cache.get(); + let instance = topology_ref.instance_by_uuid(&req.instance_uuid)?; + if has_states!(instance, * -> Expelled) { + // Idempotency + return Ok(Response {}); + } - let req = rpc::update_instance::Request::new(name, req.cluster_name) + let timeout = req.timeout; + let req = rpc::update_instance::Request::new(instance.name.clone(), req.cluster_name) .with_target_state(Expelled); - handle_update_instance_request_and_wait(req, TIMEOUT)?; + + // Must not hold this reference across yields + drop(topology_ref); + + handle_update_instance_request_and_wait(req, timeout)?; Ok(Response {}) } @@ -52,12 +59,14 @@ crate::define_rpc_request! { pub struct Request { pub cluster_name: String, pub instance_uuid: String, + pub timeout: Duration, } pub struct Response {} } pub mod redirect { + use super::*; use ::tarantool::fiber; use crate::rpc::network_call_to_leader; @@ -66,7 +75,31 @@ pub mod redirect { crate::define_rpc_request! { fn proc_expel_redirect(req: Request) -> Result<Response> { let Request(req_to_leader) = req; + + let deadline = fiber::clock().saturating_add(req_to_leader.timeout); fiber::block_on(network_call_to_leader(crate::proc_name!(super::proc_expel), &req_to_leader))?; + + let node = node::global()?; + let instance_uuid = &req_to_leader.instance_uuid; + loop { + let topology_ref = node.topology_cache.get(); + let instance = topology_ref.instance_by_uuid(instance_uuid)?; + if has_states!(instance, Expelled -> *) { + break + } + + let now = fiber::clock(); + if now > deadline { + return Err(BoxError::new(TarantoolErrorCode::Timeout, "expel confirmation didn't arrive in time").into()); + } + + // Must not hold this reference across yields + drop(topology_ref); + + let timeout = deadline.duration_since(now); + _ = node.wait_index(node.get_index() + 1, timeout); + } + Ok(Response {}) } diff --git a/test/conftest.py b/test/conftest.py index ecc2d46e04f5ec9fcb7dc91e61bd609761de6b19..4db2cb8ec193360631d92eb2a3deeb122e40e406 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -449,6 +449,19 @@ class NotALeader(Exception): pass +class CommandFailed(Exception): + def __init__(self, stdout, stderr): + self.stdout = maybe_decode_utf8(stdout) + self.stderr = maybe_decode_utf8(stderr) + + +def maybe_decode_utf8(s: bytes) -> str | bytes: + try: + return s.decode() + except UnicodeDecodeError: + return s + + class Retriable: """A utility class for handling retries. @@ -1890,6 +1903,7 @@ class Cluster: self, target: Instance, peer: Instance | None = None, + timeout: int = 10, ): peer = peer if peer else target assert self.service_password_file, "cannot expel without pico_service password" @@ -1903,16 +1917,20 @@ class Cluster: "--cluster-name", target.cluster_name or "", "--password-file", self.service_password_file, "--auth-type", "chap-sha1", + "--timeout", str(timeout), target_uuid, ] # fmt: on print(f"executing: {command}") - rc = subprocess.call( - command, - stdin=subprocess.DEVNULL, - ) - assert rc == 0 + try: + subprocess.check_output( + command, + stdin=subprocess.DEVNULL, + stderr=subprocess.PIPE, + ) + except subprocess.CalledProcessError as e: + raise CommandFailed(e.stdout, e.stderr) from e def raft_wait_index(self, index: int, timeout: float = 10): """ diff --git a/test/int/test_expelling.py b/test/int/test_expelling.py index e5de7e145cb783e9536549dc5214d42dc244bedf..99cb30d5296e3fef53e29789ea945a60f22dfdd0 100644 --- a/test/int/test_expelling.py +++ b/test/int/test_expelling.py @@ -1,13 +1,14 @@ import pytest import sys import pexpect # type: ignore -from conftest import CLI_TIMEOUT, Cluster, Instance, Retriable, log_crawler - - -@pytest.fixture -def cluster3(cluster: Cluster): - cluster.deploy(instance_count=3) - return cluster +from conftest import ( + CLI_TIMEOUT, + Cluster, + Instance, + Retriable, + log_crawler, + CommandFailed, +) def assert_instance_expelled(expelled_instance: Instance, instance: Instance): @@ -22,19 +23,19 @@ def assert_voters(voters: list[Instance], instance: Instance): assert sorted(actual_voters) == sorted(expected_voters) -def test_expel_follower(cluster3: Cluster): +def test_expel_follower(cluster: Cluster): # Scenario: expel a Follower instance by command to Leader # Given a cluster # When a Follower instance expelled from the cluster # Then the instance marked as expelled in the instances table # And excluded from the voters list - i1, i2, i3 = cluster3.instances + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=3) i1.promote_or_fail() i3.assert_raft_status("Follower", leader_id=i1.raft_id) - cluster3.expel(i3, i1) + cluster.expel(i3, i1) Retriable(timeout=30).call(lambda: assert_instance_expelled(i3, i1)) Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1)) @@ -47,19 +48,19 @@ def test_expel_follower(cluster3: Cluster): assert lc.matched -def test_expel_leader(cluster3: Cluster): +def test_expel_leader(cluster: Cluster): # Scenario: expel a Leader instance by command to itself # Given a cluster # When a Leader instance expelled from the cluster # Then the instance marked as expelled in the instances table # And excluded from the voters list - i1, i2, i3 = cluster3.instances + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=3) i1.promote_or_fail() i1.assert_raft_status("Leader") - cluster3.expel(i1) + cluster.expel(i1) Retriable(timeout=30).call(lambda: assert_instance_expelled(i1, i2)) Retriable(timeout=10).call(lambda: assert_voters([i2, i3], i2)) @@ -72,19 +73,19 @@ def test_expel_leader(cluster3: Cluster): assert lc.matched -def test_expel_by_follower(cluster3: Cluster): +def test_expel_by_follower(cluster: Cluster): # Scenario: expel an instance by command to a Follower # Given a cluster # When instance which is not a Leader receives expel CLI command # Then expelling instance is expelled - i1, i2, i3 = cluster3.instances + i1, i2, i3 = cluster.deploy(instance_count=3, init_replication_factor=3) i1.promote_or_fail() i2.assert_raft_status("Follower", leader_id=i1.raft_id) i3.assert_raft_status("Follower", leader_id=i1.raft_id) - cluster3.expel(i3, i2) + cluster.expel(i3, i2) Retriable(timeout=30).call(lambda: assert_instance_expelled(i3, i1)) Retriable(timeout=10).call(lambda: assert_voters([i1, i2], i1)) @@ -175,13 +176,23 @@ cluster: cluster.expel(storage2, peer=leader) leader.wait_governor_status("idle", old_step_counter=counter) + # Check `picodata expel` idempotency + cluster.expel(storage2, peer=leader, timeout=1) + # Add another instance, it should be assigned to the no longer filled replicaset storage4 = cluster.add_instance(name="storage4", wait_online=True, tier="storage") assert storage4.replicaset_name == "r2" # Attempt to expel an offline replicaset storage3.terminate() - cluster.expel(storage3, peer=leader) + with pytest.raises(CommandFailed) as e: + cluster.expel(storage3, peer=leader, timeout=1) + assert "Timeout: expel confirmation didn't arrive in time" in e.value.stderr + + # Check `picodata expel` idempotency + with pytest.raises(CommandFailed) as e: + cluster.expel(storage3, peer=leader, timeout=1) + assert "Timeout: expel confirmation didn't arrive in time" in e.value.stderr # Offline replicasets aren't allowed to be expelled, # so the cluster is blocked attempting to rebalance diff --git a/test/int/test_replication.py b/test/int/test_replication.py index 10a99f570c656d81840b08b546b426f2f5b5e399..efe24b6daca80b580f3deb315e71b890eebfd392 100644 --- a/test/int/test_replication.py +++ b/test/int/test_replication.py @@ -1,9 +1,11 @@ import time +import pytest from conftest import ( Cluster, Instance, Retriable, + CommandFailed, ) @@ -270,7 +272,9 @@ def test_expel_blocked_by_replicaset_master_switchover_to_online_replica( i1.promote_or_fail() # Initiate master switchover by expelling i4. - cluster.expel(i4) + with pytest.raises(CommandFailed) as e: + cluster.expel(i4, timeout=1) + assert "Timeout: expel confirmation didn't arrive in time" in e.value.stderr # Wait until governor switches the replicaset master from i4 to i5 # and tries to reconfigure replication between them which will require i5 to synchronize first. @@ -333,7 +337,9 @@ def test_expel_blocked_by_replicaset_master_switchover_to_offline_replica( i1.promote_or_fail() # Initiate master switchover by expelling i4. - cluster.expel(i4) + with pytest.raises(CommandFailed) as e: + cluster.expel(i4, timeout=1) + assert "Timeout: expel confirmation didn't arrive in time" in e.value.stderr # Wait until governor switches the replicaset master from i4 to i5 # and tries to reconfigure replication between them which will require i5 to synchronize first.