Skip to content
Snippets Groups Projects
Commit f8e30cd1 authored by Georgy Moshkin's avatar Georgy Moshkin :speech_balloon: Committed by Georgy Moshkin
Browse files

refactor: remove some redundant cloning and sorting of sorted trees

parent 5cd2ff3a
No related branches found
No related tags found
1 merge request!82Feature/discovery/when raft initialized
......@@ -959,7 +959,7 @@ checksum = "f764005d11ee5f36500a149ace24e00e3da98b0158b3e2d53a7495660d3f4d60"
[[package]]
name = "tarantool"
version = "0.6.1"
version = "0.6.2"
dependencies = [
"base64",
"bitflags",
......@@ -987,7 +987,7 @@ dependencies = [
[[package]]
name = "tarantool-proc"
version = "0.1.0"
version = "0.1.1"
dependencies = [
"proc-macro2",
"quote",
......
use ::tarantool::fiber::{mutex::MutexGuard, Mutex};
use ::tarantool::proc;
use ::tarantool::uuid::Uuid;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::collections::BTreeSet;
use std::error::Error as StdError;
......@@ -17,6 +16,15 @@ pub enum Role {
NonLeader { leader: Address },
}
impl Role {
fn leader_address(&self) -> &Address {
match self {
Self::Leader { address } => address,
Self::NonLeader { leader } => leader,
}
}
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct LeaderElection {
tmp_id: String,
......@@ -42,50 +50,43 @@ pub struct Discovery {
}
impl Discovery {
fn new(tmp_id: impl AsRef<str>, peers: impl IntoIterator<Item = impl Into<Address>>) -> Self {
fn new(tmp_id: impl Into<String>, peers: impl IntoIterator<Item = impl Into<Address>>) -> Self {
// BTree and sorting for deterministic results and simpler asserts tests.
let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).sorted().collect();
let peers: BTreeSet<Address> = peers.into_iter().map(Into::into).collect();
assert!(!peers.is_empty(), "peers should not be empty");
Self {
pending_request: false,
visited: [].into(),
address: None,
state: State::LeaderElection(LeaderElection {
tmp_id: tmp_id.as_ref().into(),
tmp_id: tmp_id.into(),
peers,
}),
}
}
fn handle_request(&mut self, request: Request, to: &Address) -> Response {
match (&mut self.state, request) {
(State::Done(_), _) => {} // done we are
(
State::LeaderElection(LeaderElection { tmp_id, peers }),
Request {
tmp_id: request_tmp_id,
peers: request_peers,
},
) => {
if !request_peers.is_subset(peers) {
fn handle_request(&mut self, request: Request, to: Address) -> &Response {
match &mut self.state {
State::Done(_) => {} // done we are
State::LeaderElection(LeaderElection { tmp_id, peers }) => {
if !request.peers.is_subset(peers) {
// found a new peer
self.visited.clear()
}
peers.extend(request_peers.iter().cloned());
peers.extend(request.peers);
if tmp_id == &request_tmp_id {
if tmp_id == &request.tmp_id {
match &self.address {
Some(address) => {
if address != to {
todo!("current peer is reachable by multiple addresses")
}
Some(address) if address != &to => {
todo!("current peer is reachable by multiple addresses")
}
None => self.address = Some(to.clone()),
Some(_) => {}
None => self.address = Some(to),
};
}
}
}
self.state.clone()
&self.state
}
fn handle_response(&mut self, response: Response) {
......@@ -107,7 +108,6 @@ impl Discovery {
if peers.is_subset(&self.visited)
&& peers
.iter()
.sorted()
.next()
.expect("not expected peer_addresses to be empty")
== address
......@@ -120,19 +120,9 @@ impl Discovery {
}
}
}
(
State::LeaderElection { .. },
Response::Done(
Role::Leader {
address: leader_address,
}
| Role::NonLeader {
leader: leader_address,
},
),
) => {
(State::LeaderElection { .. }, Response::Done(role)) => {
self.state = State::Done(Role::NonLeader {
leader: leader_address,
leader: role.leader_address().into(),
});
self.visited.clear();
self.address = None;
......@@ -157,7 +147,7 @@ impl Discovery {
self.visited.insert(addr.clone());
addr
}
None => peers.iter().sorted().next().unwrap(), // peers is not empty
None => peers.iter().next().unwrap(), // peers is not empty
};
self.pending_request = true;
Some((le.clone(), addr.clone()))
......@@ -166,15 +156,15 @@ impl Discovery {
}
}
}
static mut DISCOVERY: &Option<Mutex<Discovery>> = &None;
static mut DISCOVERY: Option<Box<Mutex<Discovery>>> = None;
fn discovery() -> Option<MutexGuard<'static, Discovery>> {
unsafe { DISCOVERY }.as_ref().map(|d| d.lock())
unsafe { DISCOVERY.as_ref() }.map(|d| d.lock())
}
pub fn init_global(peers: impl IntoIterator<Item = impl Into<Address>>) {
let d = Discovery::new(Uuid::random().to_string(), peers);
unsafe { DISCOVERY = Box::leak(Box::new(Some(Mutex::new(d)))) }
unsafe { DISCOVERY = Some(Box::new(Mutex::new(d))) }
}
pub fn wait_global() -> Role {
......@@ -196,28 +186,23 @@ pub fn wait_global() -> Role {
}
#[proc]
fn proc_discover(request: Request, request_to: Address) -> Result<Response, Box<dyn StdError>> {
let mut discovery = discovery().ok_or("discovery uninitialized")?;
Ok(discovery.handle_request(request, &request_to))
fn proc_discover<'a>(
#[inject(&mut discovery())] discovery: &'a mut Option<MutexGuard<'a, Discovery>>,
request: Request,
request_to: Address,
) -> Result<&'a Response, Box<dyn StdError>> {
let discovery = discovery.as_mut().ok_or("discovery uninitialized")?;
Ok(discovery.handle_request(request, request_to))
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use std::collections::{BTreeMap, HashMap};
use super::*;
use rand::prelude::*;
impl Role {
fn leader(&self) -> Address {
(match self {
Role::Leader { address: l } => l,
Role::NonLeader { leader: l } => l,
})
.clone()
}
}
fn run(
instances: impl IntoIterator<Item = (impl Into<Address>, Discovery)>,
) -> HashMap<Address, Role> {
......@@ -247,7 +232,7 @@ mod tests {
match network.pop() {
Some(Event::Request(src, request, dst)) => {
let peer = instances.get_mut(&dst).unwrap();
let response = peer.handle_request(request, &dst).clone();
let response = peer.handle_request(request, dst).clone();
network.push(Event::Response(response, src))
}
Some(Event::Response(response, dst)) => {
......@@ -275,7 +260,7 @@ mod tests {
];
let res = run(instances);
assert!(
res.values().map(Role::leader).all_equal(),
res.values().map(Role::leader_address).all_equal(),
"multiple leaders: {:#?}",
res
);
......@@ -292,7 +277,7 @@ mod tests {
];
let res = run(instances);
assert!(
res.values().map(Role::leader).all_equal(),
res.values().map(Role::leader_address).all_equal(),
"multiple leaders: {:#?}",
res
);
......@@ -312,7 +297,7 @@ mod tests {
];
let res = run(instances);
assert!(
res.values().map(Role::leader).all_equal(),
res.values().map(Role::leader_address).all_equal(),
"multiple leaders: {:#?}",
res
);
......
......@@ -671,7 +671,7 @@ fn raft_join_loop(inbox: Mailbox<(JoinRequest, Notify)>, main_inbox: Mailbox<Nor
}
}
static mut RAFT_NODE: Option<&'static Node> = None;
static mut RAFT_NODE: Option<Box<Node>> = None;
pub fn set_global(node: Node) {
unsafe {
......@@ -679,7 +679,7 @@ pub fn set_global(node: Node) {
RAFT_NODE.is_none(),
"discovery::set_global() called twice, it's a leak"
);
RAFT_NODE = Some(Box::leak(Box::new(node)));
RAFT_NODE = Some(Box::new(node));
}
}
......@@ -688,7 +688,7 @@ pub fn global() -> Result<&'static Node, Error> {
// place while the instance is executing `start_discover()` function.
// It has already started listening, but the node is only initialized
// in `postjoin()`.
unsafe { RAFT_NODE }.ok_or(Error::Uninitialized)
unsafe { RAFT_NODE.as_deref() }.ok_or(Error::Uninitialized)
}
#[proc(packed_args)]
......
Subproject commit 994bab9b18fd27b65c32148236e5ec5bc02735a5
Subproject commit bd3594d60c3418448b44a80cc07b73653646e7f4
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment