diff --git a/src/args.rs b/src/args.rs index df66457988078eb9a8ae711a74d48f9cfe258949..bef897b3f272b65092c7d68c06af33075da887e9 100644 --- a/src/args.rs +++ b/src/args.rs @@ -50,9 +50,15 @@ pub struct Run { /// Execute tarantool (Lua) script pub tarantool_exec: Option<CString>, - #[clap(long, value_name = "name", env = "PICODATA_INSTANCE_ID")] + #[clap( + long, + value_name = "name", + default_value = "", + env = "PICODATA_INSTANCE_ID" + )] /// Name of the instance - pub instance_id: String, + /// Empty value means that instance_id of a Follower will be generated on the Leader + instance_id: String, #[clap( long = "advertise", @@ -147,6 +153,13 @@ impl Run { pub fn log_level(&self) -> SayLevel { self.log_level.into() } + + pub fn instance_id(&self) -> Option<String> { + match self.instance_id.as_str() { + "" => None, + any => Some(any.to_string()), + } + } } //////////////////////////////////////////////////////////////////////////////// @@ -279,13 +292,23 @@ mod tests { { let parsed = parse![Run,]; assert_eq!(parsed.instance_id, "instance-id-from-env"); + assert_eq!( + parsed.instance_id(), + Some("instance-id-from-env".to_string()) + ); assert_eq!(parsed.peers.as_ref(), vec!["localhost:3301"]); assert_eq!(parsed.listen, "localhost:3301"); // default assert_eq!(parsed.advertise_address(), "localhost:3301"); // default assert_eq!(parsed.log_level(), SayLevel::Info); // default let parsed = parse![Run, "--instance-id", "instance-id-from-args"]; - assert_eq!(parsed.instance_id, "instance-id-from-args"); + assert_eq!( + parsed.instance_id(), + Some("instance-id-from-args".to_string()) + ); + + let parsed = parse![Run, "--instance-id", ""]; + assert_eq!(parsed.instance_id(), None); } std::env::set_var("PICODATA_PEER", "peer-from-env"); @@ -300,6 +323,18 @@ mod tests { assert_eq!(parsed.peers.as_ref(), vec!["localhost:3302"]); } + std::env::set_var("PICODATA_INSTANCE_ID", ""); + { + let parsed = parse![Run,]; + assert_eq!(parsed.instance_id(), None); + } + + std::env::remove_var("PICODATA_INSTANCE_ID"); + { + let parsed = parse![Run,]; + assert_eq!(parsed.instance_id(), None); + } + std::env::set_var("PICODATA_LISTEN", "listen-from-env"); { let parsed = parse![Run,]; diff --git a/src/main.rs b/src/main.rs index 06a651a52a0b83580a7540b87d49e3b6d6710519..671e246e104b85f921e1f61927d3cf7a0a68f7e9 100644 --- a/src/main.rs +++ b/src/main.rs @@ -407,7 +407,7 @@ fn start_boot(args: &args::Run) { let peer = traft::topology::initial_peer( args.cluster_id.clone(), - args.instance_id.clone(), + args.instance_id(), args.replicaset_id.clone(), args.advertise_address(), ); @@ -475,7 +475,7 @@ fn start_join(args: &args::Run, leader_address: String) { let req = traft::JoinRequest { cluster_id: args.cluster_id.clone(), - instance_id: args.instance_id.clone(), + instance_id: args.instance_id(), replicaset_id: args.replicaset_id.clone(), voter: false, advertise_address: args.advertise_address(), @@ -598,7 +598,7 @@ fn postjoin(args: &args::Run) { tlog!(Warning, "initiating self-promotion of {me:?}"); let req = traft::JoinRequest { cluster_id: args.cluster_id.clone(), - instance_id: me.instance_id.clone(), + instance_id: Some(me.instance_id.clone()), replicaset_id: None, // TODO voter: true, advertise_address: args.advertise_address(), diff --git a/src/traft/mod.rs b/src/traft/mod.rs index a5dc5e3569d6ae1a1d850e2ebf87ca5aee4bcb8d..351ddeecf88022f742b80fc8f3655c9118f1e7df 100644 --- a/src/traft/mod.rs +++ b/src/traft/mod.rs @@ -307,15 +307,6 @@ pub enum TopologyRequest { Deactivate(DeactivateRequest), } -impl TopologyRequest { - pub fn instance_id(&self) -> &str { - match self { - Self::Join(JoinRequest { instance_id, .. }) - | Self::Deactivate(DeactivateRequest { instance_id, .. }) => instance_id, - } - } -} - impl From<JoinRequest> for TopologyRequest { fn from(j: JoinRequest) -> Self { Self::Join(j) @@ -333,7 +324,7 @@ impl From<DeactivateRequest> for TopologyRequest { #[derive(Clone, Debug, Serialize, Deserialize)] pub struct JoinRequest { pub cluster_id: String, - pub instance_id: String, + pub instance_id: Option<String>, pub replicaset_id: Option<String>, pub advertise_address: String, pub voter: bool, diff --git a/src/traft/node.rs b/src/traft/node.rs index bb419182feef5887e43c180afb797edf7ca40da3..4676d36a5b94f0897b662d8ed31a546f3d23aae1 100644 --- a/src/traft/node.rs +++ b/src/traft/node.rs @@ -665,8 +665,6 @@ fn raft_main_loop( fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { loop { let batch = inbox.receive_all(Duration::MAX); - let ids: Vec<_> = batch.iter().map(|(req, _)| req.instance_id()).collect(); - tlog!(Info, "processing batch: {ids:?}"); let term = Storage::term().unwrap().unwrap_or(0); let mut topology = match Storage::peers() { @@ -680,18 +678,37 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { } }; + let mut topology_results = vec![]; + for (req, notify) in &batch { - if let Err(e) = topology.process(req) { - let e = RaftError::ConfChangeError(e); - notify.try_send(Err(e)).expect("that's a bug"); + match topology.process(req) { + Ok(peer) => { + topology_results.push((notify, peer)); + } + Err(e) => { + let e = RaftError::ConfChangeError(e); + notify.try_send(Err(e)).expect("that's a bug"); + } } } + let topology_diff = topology.diff(); + let topology_to_replace = topology.to_replace(); + + let mut ids: Vec<String> = vec![]; + for peer in &topology_diff { + ids.push(peer.instance_id.clone()); + } + for (_, peer) in &topology_to_replace { + ids.push(peer.instance_id.clone()); + } + tlog!(Info, "processing batch: {ids:?}"); + let (rx, tx) = fiber::Channel::new(1).into_clones(); main_inbox.send(NormalRequest::ProposeConfChange { term, - peers: topology.diff(), - to_replace: topology.to_replace(), + peers: topology_diff, + to_replace: topology_to_replace, notify: tx, }); @@ -701,12 +718,12 @@ fn raft_join_loop(inbox: TopologyMailbox, main_inbox: Mailbox<NormalRequest>) { // after the node leaves the joint state. let res = rx.recv().expect("that's a bug"); tlog!(Info, "batch processed: {ids:?}, {res:?}"); - for (_, notify) in batch { - // RaftError doesn't implement the Clone trait, - // so we have to be creative. + for (notify, peer) in topology_results { match &res { - Ok(v) => notify.try_send(Ok(*v)).ok(), + Ok(_) => notify.try_send(Ok(peer.raft_id)).ok(), Err(e) => { + // RaftError doesn't implement the Clone trait, + // so we have to be creative. let e = RaftError::ConfChangeError(format!("{e}")); notify.try_send(Err(e)).ok() } @@ -757,11 +774,9 @@ fn raft_join(req: JoinRequest) -> Result<JoinResponse, Box<dyn StdError>> { })); } - let instance_id = req.instance_id.clone(); - node.change_topology(req)?; + let raft_id = node.change_topology(req)?; - let peer = Storage::peer_by_instance_id(&instance_id)? - .ok_or("the peer has misteriously disappeared")?; + let peer = Storage::peer_by_raft_id(raft_id)?.ok_or("the peer has misteriously disappeared")?; let raft_group = Storage::peers()?; let box_replication = Storage::box_replication(&peer.replicaset_id, Some(peer.commit_index))?; diff --git a/src/traft/topology.rs b/src/traft/topology.rs index 784741276abdc9372eae05ef1522d625fcb2c2a9..086e4883553290ee7f82a621c757c80a1cbeb955 100644 --- a/src/traft/topology.rs +++ b/src/traft/topology.rs @@ -58,9 +58,9 @@ impl Topology { .insert(peer.raft_id); } - fn peer_by_instance_id(&self, instance_id: &str) -> Option<&Peer> { + fn peer_by_instance_id(&self, instance_id: &str) -> Option<Peer> { let raft_id = self.instance_id_map.get(instance_id)?; - self.peers.get(raft_id) + self.peers.get(raft_id).cloned() } fn choose_replicaset_id(&self) -> String { @@ -80,67 +80,83 @@ impl Topology { } } - pub fn join(&mut self, req: &JoinRequest) -> Result<(), String> { - if let Some(peer) = self.peer_by_instance_id(&req.instance_id) { - match &req.replicaset_id { - Some(replicaset_id) if replicaset_id != &peer.replicaset_id => { - let e = format!( - std::concat!( - "{} already joined with a different replicaset_id,", - " requested: {},", - " existing: {}.", - ), - req.instance_id, replicaset_id, peer.replicaset_id - ); - return Err(e); - } - _ => (), - } - - let mut peer = peer.clone(); - peer.peer_address = req.advertise_address.clone(); - peer.voter = req.voter; - peer.is_active = true; + fn choose_instance_id(instance_id: Option<String>, raft_id: u64) -> String { + match instance_id { + Some(v) => v, + None => format!("i{raft_id}"), + } + } - if req.voter { - self.diff.insert(peer.raft_id); - } else { - let old_raft_id = peer.raft_id; - peer.raft_id = self.max_raft_id + 1; + pub fn join(&mut self, req: &JoinRequest) -> Result<Peer, String> { + match &req.instance_id { + Some(instance_id) => match self.peer_by_instance_id(instance_id) { + Some(peer) => self.modify_existing_instance(peer, req), + None => self.join_new_instance(req), + }, + None => self.join_new_instance(req), + } + } - self.to_replace.insert(peer.raft_id, old_raft_id); + fn modify_existing_instance(&mut self, peer: Peer, req: &JoinRequest) -> Result<Peer, String> { + match &req.replicaset_id { + Some(replicaset_id) if replicaset_id != &peer.replicaset_id => { + let e = format!( + std::concat!( + "{} already joined with a different replicaset_id,", + " requested: {},", + " existing: {}.", + ), + peer.instance_id, replicaset_id, peer.replicaset_id + ); + return Err(e); } - self.put_peer(peer); + _ => (), + } - return Ok(()); - } else { - let raft_id = self.max_raft_id + 1; - let replicaset_id = match &req.replicaset_id { - Some(v) => v.clone(), - None => self.choose_replicaset_id(), - }; - let replicaset_uuid = replicaset_uuid(&replicaset_id); + let mut peer = peer; + peer.peer_address = req.advertise_address.clone(); + peer.voter = req.voter; + peer.is_active = true; - let peer = Peer { - raft_id, - instance_id: req.instance_id.clone(), - replicaset_id, - commit_index: INVALID_INDEX, - instance_uuid: instance_uuid(&req.instance_id), - replicaset_uuid, - peer_address: req.advertise_address.clone(), - voter: req.voter, - is_active: true, - }; + if req.voter { + self.diff.insert(peer.raft_id); + } else { + let old_raft_id = peer.raft_id; + peer.raft_id = self.max_raft_id + 1; - self.diff.insert(raft_id); - self.put_peer(peer); + self.to_replace.insert(peer.raft_id, old_raft_id); } + self.put_peer(peer.clone()); + Ok(peer) + } + + fn join_new_instance(&mut self, req: &JoinRequest) -> Result<Peer, String> { + let raft_id = self.max_raft_id + 1; + let replicaset_id = match &req.replicaset_id { + Some(v) => v.clone(), + None => self.choose_replicaset_id(), + }; + let replicaset_uuid = replicaset_uuid(&replicaset_id); + let instance_id = Self::choose_instance_id(req.instance_id.clone(), raft_id); - Ok(()) + let peer = Peer { + raft_id, + instance_id: instance_id.clone(), + replicaset_id, + commit_index: INVALID_INDEX, + instance_uuid: instance_uuid(&instance_id), + replicaset_uuid, + peer_address: req.advertise_address.clone(), + voter: req.voter, + is_active: true, + }; + + self.diff.insert(raft_id); + self.put_peer(peer.clone()); + Ok(peer) } - pub fn deactivate(&mut self, req: &DeactivateRequest) -> Result<(), String> { + pub fn deactivate(&mut self, req: &DeactivateRequest) -> Result<Peer, String> { let peer = match self.peer_by_instance_id(&req.instance_id) { Some(peer) => peer, None => { @@ -154,17 +170,17 @@ impl Topology { let peer = Peer { voter: false, is_active: false, - ..peer.clone() + ..peer }; self.diff.insert(peer.raft_id); // no need to call put_peer, as the peer was already in the cluster - self.peers.insert(peer.raft_id, peer); + self.peers.insert(peer.raft_id, peer.clone()); - Ok(()) + Ok(peer) } - pub fn process(&mut self, req: &TopologyRequest) -> Result<(), String> { + pub fn process(&mut self, req: &TopologyRequest) -> Result<Peer, String> { match req { TopologyRequest::Join(join) => self.join(join), TopologyRequest::Deactivate(deactivate) => self.deactivate(deactivate), @@ -197,7 +213,7 @@ impl Topology { // Create first peer in the cluster pub fn initial_peer( cluster_id: String, - instance_id: String, + instance_id: Option<String>, replicaset_id: Option<String>, advertise_address: String, ) -> Peer { @@ -271,7 +287,7 @@ mod tests { ) => { &crate::traft::TopologyRequest::Join(crate::traft::JoinRequest { cluster_id: "cluster1".into(), - instance_id: $instance_id.into(), + instance_id: Some($instance_id.into()), replicaset_id: $replicaset_id.map(|v: &str| v.into()), advertise_address: $advertise_address.into(), voter: $voter, diff --git a/test/int/conftest.py b/test/int/conftest.py index 453e03ef873a1775fddf5663882a736a9b58c388..c70c54445d67e9ab2a380b4136e06e5d257ed0e5 100644 --- a/test/int/conftest.py +++ b/test/int/conftest.py @@ -149,6 +149,8 @@ assert color.intense_red("text") == "\x1b[31;1mtext\x1b[0m" OUT_LOCK = threading.Lock() +POSITION_IN_SPACE_INSTANCE_ID = 3 + @dataclass class Instance: @@ -332,6 +334,11 @@ class Instance: status = self._raft_status() assert status.is_ready self.raft_id = status.id + with self.connect(timeout=1) as conn: + self.instance_id = conn.space("raft_group").select((self.raft_id,))[0][ + POSITION_IN_SPACE_INSTANCE_ID + ] + eprint(f"{self.instance_id=}") eprint(f"{self} is ready") @funcy.retry(tries=4, timeout=0.1, errors=AssertionError) @@ -380,11 +387,15 @@ class Cluster: def __getitem__(self, item: int) -> Instance: return self.instances[item] - def deploy(self, *, instance_count: int) -> list[Instance]: + def deploy( + self, *, instance_count: int, generate_instance_id=True + ) -> list[Instance]: assert not self.instances, "Already deployed" for i in range(instance_count): - self.add_instance(wait_ready=False) + self.add_instance( + wait_ready=False, generate_instance_id=generate_instance_id + ) for instance in self.instances: instance.start() @@ -395,13 +406,15 @@ class Cluster: eprint(f" {self} deployed ".center(80, "=")) return self.instances - def add_instance(self, wait_ready=True, peers=None) -> Instance: + def add_instance( + self, wait_ready=True, peers=None, generate_instance_id=True + ) -> Instance: i = 1 + len(self.instances) instance = Instance( binary_path=self.binary_path, cluster_id=self.id, - instance_id=f"i{i}", + instance_id=f"i{i}" if generate_instance_id else "", data_dir=f"{self.data_dir}/i{i}", host=self.base_host, port=self.base_port + i, diff --git a/test/int/test_joining.py b/test/int/test_joining.py index 4ba64a68d172bc55e8dbf7f97d3635443e4dd0d6..fcbdfa3f8829fef4a6632f2ddc7aabeb83d2b71f 100644 --- a/test/int/test_joining.py +++ b/test/int/test_joining.py @@ -277,3 +277,19 @@ def test_rebootstrap_follower(cluster3: Cluster): i3.restart(remove_data=True) i3.wait_ready() i3.assert_raft_status("Follower") + + +def test_join_without_explicit_instance_id(cluster: Cluster): + # Scenario: bootstrap single instance without explicitly given instance id + # Given no instances started + # When two instances starts without instance_id given + # Then the one of the instances became Leader with instance_id=1 + # And the second one of the became Follower with instance_id 2 + + cluster.deploy(instance_count=2, generate_instance_id=False) + + i1, i2 = cluster.instances + i1.assert_raft_status("Leader") + assert i1.instance_id == "i1" + i2.assert_raft_status("Follower") + assert i2.instance_id == "i2"