diff --git a/src/main.rs b/src/main.rs index 61cda8e237065ed4809f818efba89e351f61aa97..a6ef80a673fb33bcbaee5d63d7884959422bfeac 100644 --- a/src/main.rs +++ b/src/main.rs @@ -531,6 +531,13 @@ fn start_join(args: &args::Run, leader_address: String) { fn postjoin(args: &args::Run) { tlog!(Info, ">>>>> postjoin()"); + let mut box_cfg = tarantool::cfg().unwrap(); + + // Reset the quorum BEFORE initializing the raft node. + // Otherwise it may stuck on `box.cfg({replication})` call. + box_cfg.replication_connect_quorum = 0; + tarantool::set_cfg(&box_cfg); + let raft_id = traft::Storage::id().unwrap().unwrap(); let applied = traft::Storage::applied().unwrap().unwrap_or(0); let raft_cfg = raft::Config { @@ -561,10 +568,8 @@ fn postjoin(args: &args::Run) { traft::node::set_global(node); let node = traft::node::global().unwrap(); - tarantool::set_cfg(&tarantool::Cfg { - listen: Some(args.listen.clone()), - ..tarantool::cfg().unwrap() - }); + box_cfg.listen = Some(args.listen.clone()); + tarantool::set_cfg(&box_cfg); while node.status().leader_id == None { node.wait_status(); @@ -580,6 +585,10 @@ fn postjoin(args: &args::Run) { } } + let peer = traft::Storage::peer_by_raft_id(raft_id).unwrap().unwrap(); + box_cfg.replication = traft::Storage::box_replication(&peer.replicaset_id, None).unwrap(); + tarantool::set_cfg(&box_cfg); + loop { let timeout = Duration::from_millis(220); let me = traft::Storage::peer_by_raft_id(raft_id) diff --git a/src/tarantool.rs b/src/tarantool.rs index 49f24157b581c2b5283d2b16695344e56ea4fdbd..e6f9df73664dd5ed12f0211a7a5aeaff7933b01a 100644 --- a/src/tarantool.rs +++ b/src/tarantool.rs @@ -106,6 +106,7 @@ pub struct Cfg { pub instance_uuid: Option<String>, pub replicaset_uuid: Option<String>, pub replication: Vec<String>, + pub replication_connect_quorum: u8, pub wal_dir: String, pub memtx_dir: String, @@ -123,6 +124,7 @@ impl Default for Cfg { instance_uuid: None, replicaset_uuid: None, replication: vec![], + replication_connect_quorum: 32, wal_dir: ".".into(), memtx_dir: ".".into(), diff --git a/src/traft/node.rs b/src/traft/node.rs index 122c7cf5f0eda37c7504daf5dd5b38ee8f906434..48e3efd513774eb37f1296124510905d66b161e8 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -463,6 +463,7 @@ fn raft_main_loop( raw_node: &mut RawNode, pool: &mut ConnectionPool, joint_state_latch: &mut Option<JointStateLatch>, + config_changed: &mut bool, ) { for entry in entries .iter() @@ -521,6 +522,7 @@ fn raft_main_loop( let mut cc = raft::ConfChange::default(); cc.merge_from_bytes(&entry.data).unwrap(); + *config_changed = true; raw_node.apply_conf_change(&cc).unwrap() } raft::EntryType::EntryConfChangeV2 => { @@ -535,6 +537,7 @@ fn raft_main_loop( .try_send(Ok(entry.index)) .expect("that's a bug"); *joint_state_latch = None; + *config_changed = true; } } @@ -555,6 +558,8 @@ fn raft_main_loop( } } + let mut config_changed = false; + start_transaction(|| -> Result<(), TransactionError> { if !ready.messages().is_empty() { // Send out the messages come from the node. @@ -575,6 +580,7 @@ fn raft_main_loop( &mut raw_node, &mut pool, &mut joint_state_latch, + &mut config_changed, ); if !ready.entries().is_empty() { @@ -633,6 +639,7 @@ fn raft_main_loop( &mut raw_node, &mut pool, &mut joint_state_latch, + &mut config_changed, ); // Advance the apply index. @@ -640,6 +647,16 @@ fn raft_main_loop( Ok(()) }) .unwrap(); + + if config_changed { + if let Some(peer) = traft::Storage::peer_by_raft_id(raw_node.raft.id).unwrap() { + let mut box_cfg = crate::tarantool::cfg().unwrap(); + assert_eq!(box_cfg.replication_connect_quorum, 0); + box_cfg.replication = + traft::Storage::box_replication(&peer.replicaset_id, None).unwrap(); + crate::tarantool::set_cfg(&box_cfg); + } + } } } @@ -744,7 +761,7 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> { let peer = Storage::peer_by_instance_id(&instance_id)? .ok_or("the peer has misteriously disappeared")?; let raft_group = Storage::peers()?; - let box_replication = Storage::box_replication(&peer)?; + let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?; Ok(JoinResponse { peer, diff --git a/src/traft/storage.rs b/src/traft/storage.rs index a23956ac769c5aba6b6916dda77391439654bbd3..a1151294bd9e37b197358a21ed88c1d48af9243f 100644 --- a/src/traft/storage.rs +++ b/src/traft/storage.rs @@ -169,7 +169,10 @@ impl Storage { Ok(ret) } - pub fn box_replication(peer: &traft::Peer) -> Result<Vec<String>, StorageError> { + pub fn box_replication( + replicaset_id: &str, + max_index: Option<u64>, + ) -> Result<Vec<String>, StorageError> { let mut ret = Vec::new(); const IDX: &str = "replicaset_id"; @@ -177,19 +180,21 @@ impl Storage { .index(IDX) .ok_or_else(|| Error::NoSuchIndex(RAFT_GROUP.into(), IDX.into())) .map_err(box_err!())? - .select(IteratorType::GE, &(&peer.replicaset_id,)) + .select(IteratorType::GE, &(replicaset_id,)) .map_err(box_err!())?; for tuple in iter { let replica: traft::Peer = tuple.into_struct().map_err(box_err!())?; - if replica.replicaset_id != peer.replicaset_id - || replica.commit_index > peer.commit_index - { + if replica.replicaset_id != replicaset_id { // In Tarantool the iteration must be interrupted explicitly. break; } + if matches!(max_index, Some(idx) if replica.commit_index > idx) { + break; + } + ret.push(replica.peer_address); } @@ -619,27 +624,25 @@ inventory::submit!(crate::InnerTest { assert_eq!(Storage::peer_by_instance_id("i6"), Ok(None)); } - let box_replication = |rsid: &str, idx: u64| { - let peer = traft::Peer { - replicaset_id: rsid.into(), - commit_index: idx, - ..Default::default() - }; - Storage::box_replication(&peer).unwrap() + let box_replication = |replicaset_id: &str, max_index: Option<u64>| { + Storage::box_replication(replicaset_id, max_index).unwrap() }; { - assert_eq!(box_replication("r1", 0), Vec::<&str>::new()); - assert_eq!(box_replication("XX", 99), Vec::<&str>::new()); + assert_eq!(box_replication("r1", Some(0)), Vec::<&str>::new()); + assert_eq!(box_replication("XX", None), Vec::<&str>::new()); - assert_eq!(box_replication("r1", 1), vec!["addr:1"]); - assert_eq!(box_replication("r1", 2), vec!["addr:1", "addr:2"]); - assert_eq!(box_replication("r1", 99), vec!["addr:1", "addr:2"]); + assert_eq!(box_replication("r1", Some(1)), vec!["addr:1"]); + assert_eq!(box_replication("r1", Some(2)), vec!["addr:1", "addr:2"]); + assert_eq!(box_replication("r1", Some(99)), vec!["addr:1", "addr:2"]); + assert_eq!(box_replication("r1", None), vec!["addr:1", "addr:2"]); - assert_eq!(box_replication("r2", 10), vec!["addr:3", "addr:4"]); - assert_eq!(box_replication("r2", 10), vec!["addr:3", "addr:4"]); + assert_eq!(box_replication("r2", Some(10)), vec!["addr:3", "addr:4"]); + assert_eq!(box_replication("r2", Some(10)), vec!["addr:3", "addr:4"]); + assert_eq!(box_replication("r2", None), vec!["addr:3", "addr:4"]); - assert_eq!(box_replication("r3", 10), vec!["addr:5"]); + assert_eq!(box_replication("r3", Some(10)), vec!["addr:5"]); + assert_eq!(box_replication("r3", None), vec!["addr:5"]); } raft_group.index("instance_id").unwrap().drop().unwrap(); @@ -656,7 +659,7 @@ inventory::submit!(crate::InnerTest { raft_group.index("replicaset_id").unwrap().drop().unwrap(); assert_err!( - Storage::box_replication(&traft::Peer::default()), + Storage::box_replication("", None), concat!( "unknown error", " no such index \"replicaset_id\"", diff --git a/test/int/test_joining.py b/test/int/test_joining.py index c4e437c19306865e365de7fc76616737bd892b9e..b2002a7a042e1548fdcf557c9b3d47b4c0bb99b4 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -1,6 +1,7 @@ from functools import partial import os import errno +import funcy # type: ignore import re import signal import pytest @@ -202,11 +203,15 @@ def test_replication(cluster2: Cluster): assert i1.replicaset_uuid() == i2.replicaset_uuid() + @funcy.retry(tries=20, timeout=0.1) + def wait_replicated(instance): + box_replication = instance.eval("return box.cfg.replication") + assert box_replication == [i1.listen, i2.listen], instance + for instance in cluster2.instances: with instance.connect(1) as conn: raft_peer = conn.select("raft_group", [instance.raft_id])[0] space_cluster = conn.select("_cluster") - cfg_replication = conn.eval("return box.cfg.replication") assert raft_peer[:-1] == [ instance.raft_id, @@ -223,11 +228,21 @@ def test_replication(cluster2: Cluster): [2, i2.instance_uuid()], ] - if instance == i1: - with pytest.raises(AssertionError): # FIXME - assert cfg_replication[0] == [i1.listen, i2.listen] - else: - assert cfg_replication[0] == [i1.listen, i2.listen] + wait_replicated(instance) + + # It doesn't affect replication setup + # but speeds up the test by eliminating failover. + i1.promote_or_fail() + + i2.assert_raft_status("Follower") + i2.restart() + wait_replicated(i2) + + i2.promote_or_fail() + + i1.assert_raft_status("Follower") + i1.restart() + wait_replicated(i1) def test_cluster_id_mismatch(instance: Instance):