diff --git a/src/governor/plan.rs b/src/governor/plan.rs index 3a2c738bc1db4ad13a256827e4667f4cb39fcc0d..4d427b300b710571049df80528d0b48d5057beb7 100644 --- a/src/governor/plan.rs +++ b/src/governor/plan.rs @@ -340,7 +340,17 @@ pub(super) fn action_plan<'i>( if has_pending_schema_change { let mut targets = Vec::with_capacity(replicasets.len()); for r in replicasets.values() { - targets.push(&r.master_id); + let Some(master) = instances.iter().find(|i| i.instance_id == r.master_id) else { + tlog!(Warning, + "couldn't find instance with id {}, which is chosen as master of replicaset {}", + r.master_id, r.replicaset_id, + ); + continue; + }; + if !master.may_respond() { + continue; + } + targets.push(&master.instance_id); } let rpc = rpc::ddl_apply::Request { diff --git a/src/traft/node.rs b/src/traft/node.rs index 01df7c6f2e34642fbd738f214432b7aeda658606..454a8ca282a49f822a6feef2290624591385abd2 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -17,7 +17,6 @@ use crate::storage::pico_schema_version; use crate::storage::ToEntryIter as _; use crate::storage::{Clusterwide, ClusterwideSpace, ClusterwideSpaceIndex, PropertyName}; use crate::stringify_cfunc; -use crate::tarantool::eval as lua_eval; use crate::tlog; use crate::traft; use crate::traft::error::Error; @@ -805,9 +804,8 @@ impl NodeImpl { // This instance is catching up to the cluster and must sync // with replication master on it's own. if current_version_in_tarantool < pending_version { - // TODO: check if replication master in _pico_replicaset - let not_master = lua_eval::<bool>("return box.info.ro").expect("lua error"); - if not_master { + let is_master = self.is_replicaset_master().expect("storage_error"); + if !is_master { return true; // wait_lsn } } @@ -846,8 +844,7 @@ impl NodeImpl { // own. // FIXME: copy-pasted from above if current_version_in_tarantool < pending_version { - // TODO: check if replication master in _pico_replicaset - let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + let is_master = self.is_replicaset_master().expect("storage_error"); if is_master { let resp = ddl_apply::apply_schema_change(&self.storage, &ddl, pending_version) @@ -888,8 +885,7 @@ impl NodeImpl { // This condition means, schema versions must always increase // even after an DdlAbort if current_version_in_tarantool == pending_version { - // TODO: check if replication master in _pico_replicaset - let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + let is_master = self.is_replicaset_master().expect("storage_error"); if !is_master { // wait_lsn return true; @@ -914,8 +910,7 @@ impl NodeImpl { // Update tarantool metadata. // FIXME: copy-pasted from above if current_version_in_tarantool == pending_version { - // TODO: check if replication master in _pico_replicaset - let is_master = !lua_eval::<bool>("return box.info.ro").expect("lua error"); + let is_master = self.is_replicaset_master().expect("storage_error"); if is_master { let current_version = storage_properties .current_schema_version() @@ -1023,7 +1018,15 @@ impl NodeImpl { unique: true, local: true, }; - self.storage.indexes.insert(&primary_key_def)?; + let res = self.storage.indexes.insert(&primary_key_def); + if let Err(e) = res { + // Ignore the error for now, let governor deal with it. + tlog!( + Warning, + "failed creating index '{}': {e}", + primary_key_def.name + ); + } match distribution { Distribution::Global => { @@ -1054,7 +1057,15 @@ impl NodeImpl { // TODO: support other cases local: true, }; - self.storage.indexes.insert(&bucket_id_def)?; + let res = self.storage.indexes.insert(&bucket_id_def); + if let Err(e) = res { + // Ignore the error for now, let governor deal with it. + tlog!( + Warning, + "failed creating index '{}': {e}", + bucket_id_def.name + ); + } } } @@ -1066,7 +1077,11 @@ impl NodeImpl { format, operable: false, }; - self.storage.spaces.insert(&space_def)?; + let res = self.storage.spaces.insert(&space_def); + if let Err(e) = res { + // Ignore the error for now, let governor deal with it. + tlog!(Warning, "failed creating space '{}': {e}", space_def.name); + } } Ddl::CreateIndex { @@ -1339,11 +1354,15 @@ impl NodeImpl { assert!(self.raw_node.raft.state != RaftStateRole::Leader); let my_id = self.raw_node.raft.id; - let replicaset_id = self.storage.instances.get(&my_id)?.replicaset_id; + let my_instance_info = self.storage.instances.get(&my_id)?; + let replicaset_id = my_instance_info.replicaset_id; let replicaset = self.storage.replicasets.get(&replicaset_id)?; let replicaset = replicaset.ok_or_else(|| { Error::other(format!("replicaset info for id {replicaset_id} not found")) })?; + if replicaset.master_id == my_instance_info.instance_id { + return Err(Error::other("wait_lsn called on replicaset master")); + } let master = self.storage.instances.get(&replicaset.master_id)?; let master_uuid = master.instance_uuid; @@ -1385,6 +1404,21 @@ impl NodeImpl { } } + fn is_replicaset_master(&self) -> traft::Result<bool> { + let my_raft_id = self.raw_node.raft.id; + let my_instance_info = self.storage.instances.get(&my_raft_id)?; + let replicaset_id = my_instance_info.replicaset_id; + let replicaset = self.storage.replicasets.get(&replicaset_id)?; + let res = if let Some(replicaset) = replicaset { + my_instance_info.instance_id == replicaset.master_id + } else { + // Replicaset wasn't initialized yet, fallback to lua eval + let is_ro: bool = crate::tarantool::eval("return box.info.ro")?; + !is_ro + }; + Ok(res) + } + #[inline] fn cleanup_notifications(&mut self) { self.notifications.retain(|_, notify| !notify.is_closed()); diff --git a/test/int/test_ddl.py b/test/int/test_ddl.py index 176f14ea2accfbcd9245a805865942e801bb1f99..65cf6f14b55bfbf74b59d6493c34bca9ee949fd9 100644 --- a/test/int/test_ddl.py +++ b/test/int/test_ddl.py @@ -300,7 +300,19 @@ def test_ddl_create_space_partial_failure(cluster: Cluster): assert i2.call("box.space._space:get", 666) is None assert i3.call("box.space._space:get", 666) is None - # TODO: terminate i3, commit create space and wake i3 back up + # Put i3 to sleep + i3.terminate() + + # Propose the same space creation which this time succeeds, because there's + # no conflict on any online instances. + index = i1.ddl_create_space(space_def) + i2.call(".proc_sync_raft", index, (3, 0)) + + assert i1.call("box.space._space:get", 666) is not None + assert i2.call("box.space._space:get", 666) is not None + + # Wake i3 up and currently it just panics... + i3.fail_to_start() def test_ddl_from_snapshot(cluster: Cluster):